Skip to content

Commit

Permalink
Add logic to scale to zero on invalid offset even with earliest offse…
Browse files Browse the repository at this point in the history
…tResetPolicy (kedacore#5689)

Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
  • Loading branch information
dttung2905 committed May 15, 2024
1 parent 613919b commit a375d17
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Here is an overview of all new **experimental** features:
### Improvements

- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))

### Fixes

Expand Down
3 changes: 3 additions & 0 deletions pkg/scalers/apache_kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,9 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co
}
producerOffset := producerOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
if s.metadata.scaleToZeroOnInvalidOffset {
return 0, 0, nil
}
return producerOffset, producerOffset, nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,9 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset
}
latestOffset := topicPartitionOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
if s.metadata.scaleToZeroOnInvalidOffset {
return 0, 0, nil
}
return latestOffset, latestOffset, nil
}

Expand Down
105 changes: 93 additions & 12 deletions tests/scalers/apache_kafka/apache_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ spec:
lagThreshold: '1'
offsetResetPolicy: 'latest'`

invalidOffsetScaledObjectTemplate = `
invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
Expand Down Expand Up @@ -243,6 +243,44 @@ spec:
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'latest'`

invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}
namespace: {{.TestNamespace}}
labels:
app: {{.DeploymentName}}
spec:
pollingInterval: 5
cooldownPeriod: 0
scaleTargetRef:
name: {{.DeploymentName}}
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
triggers:
- type: kafka
metadata:
topic: {{.TopicName}}
bootstrapServers: {{.BootstrapServer}}
consumerGroup: {{.ResetPolicy}}
lagThreshold: '1'
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'earliest'`

persistentLagScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
Expand Down Expand Up @@ -407,8 +445,10 @@ func TestScaler(t *testing.T) {
testEarliestPolicy(t, kc, data)
testLatestPolicy(t, kc, data)
testMultiTopic(t, kc, data)
testZeroOnInvalidOffset(t, kc, data)
testOneOnInvalidOffset(t, kc, data)
testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data)
testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data)
testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data)
testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data)
testPersistentLag(t, kc, data)
testScalingOnlyPartitionsWithLag(t, kc, data)
}
Expand Down Expand Up @@ -509,7 +549,7 @@ func testMultiTopic(t *testing.T, kc *kubernetes.Clientset, data templateData) {
"replica count should be %d after 2 minute", 2)
}

func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) {
func testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopic: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
Expand All @@ -518,14 +558,54 @@ func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templa
data.ScaleToZeroOnInvalid = StringTrue
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30)
}

func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) {
func testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = zeroInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringTrue
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30)
}

func testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = oneInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringFalse
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)

// Should scale to 1
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)

commitPartition(t, oneInvalidOffsetTopic, invalidOffsetGroup)
publishMessage(t, oneInvalidOffsetTopic)

// Should scale to 0
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 10),
"replica count should be %d after 10 minute", 0)
}

func testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopic: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
Expand All @@ -534,8 +614,9 @@ func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templat
data.ScaleToZeroOnInvalid = StringFalse
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
publishMessage(t, oneInvalidOffsetTopic) // So that the latest offset is not 0
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Should scale to 1
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
Expand Down Expand Up @@ -570,7 +651,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
t.Log("--- testing persistentLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup)
data.Commit = StringTrue
data.TopicName = persistentLagTopic
Expand All @@ -583,7 +664,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
publishMessage(t, persistentLagTopic)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)
// Recreate Deployment to delibrately assign different consumer group to deployment and scaled object
// Recreate Deployment to deliberately assign different consumer group to deployment and scaled object
// This is to simulate inability to consume from topic
// Scaled Object remains unchanged
KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace)
Expand Down Expand Up @@ -613,7 +694,7 @@ func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, da
t.Log("--- testing limitToPartitionsWithLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
commitPartition(t, limitToPartitionsWithLagTopic, "latest")

data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup)
Expand Down
109 changes: 95 additions & 14 deletions tests/scalers/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ spec:
lagThreshold: '1'
offsetResetPolicy: 'latest'`

invalidOffsetScaledObjectTemplate = `
invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
Expand Down Expand Up @@ -242,6 +242,44 @@ spec:
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'latest'`

invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObjectName}}
namespace: {{.TestNamespace}}
labels:
app: {{.DeploymentName}}
spec:
pollingInterval: 5
cooldownPeriod: 0
scaleTargetRef:
name: {{.DeploymentName}}
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
triggers:
- type: kafka
metadata:
topic: {{.TopicName}}
bootstrapServers: {{.BootstrapServer}}
consumerGroup: {{.ResetPolicy}}
lagThreshold: '1'
scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}'
offsetResetPolicy: 'earliest'`

persistentLagScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
Expand Down Expand Up @@ -406,8 +444,10 @@ func TestScaler(t *testing.T) {
testEarliestPolicy(t, kc, data)
testLatestPolicy(t, kc, data)
testMultiTopic(t, kc, data)
testZeroOnInvalidOffset(t, kc, data)
testOneOnInvalidOffset(t, kc, data)
testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data)
testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data)
testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t, kc, data)
testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t, kc, data)
testPersistentLag(t, kc, data)
testScalingOnlyPartitionsWithLag(t, kc, data)
}
Expand Down Expand Up @@ -507,33 +547,74 @@ func testMultiTopic(t *testing.T, kc *kubernetes.Clientset, data templateData) {
"replica count should be %d after 2 minute", 2)
}

func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopic: scale out ---")
func testZeroOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = zeroInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringTrue
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30)
}

func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopic: scale out ---")
func testZeroOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing zeroInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = zeroInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringTrue
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Shouldn't scale pods
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30)
}

func testOneOnInvalidOffsetWithLatestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopicWithLatestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = oneInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringFalse
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithLatestOffsetResetPolicyScaledObjectTemplate)

// Should scale to 1
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)

commitPartition(t, oneInvalidOffsetTopic, invalidOffsetGroup)
publishMessage(t, oneInvalidOffsetTopic)

// Should scale to 0
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 10),
"replica count should be %d after 10 minute", 0)
}

func testOneOnInvalidOffsetWithEarliestOffsetResetPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing oneInvalidOffsetTopicWithEarliestOffsetResetPolicy: scale out ---")
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup)
data.Commit = StringTrue
data.TopicName = oneInvalidOffsetTopic
data.ResetPolicy = invalidOffsetGroup
data.ScaleToZeroOnInvalid = StringFalse
KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate)
KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate)
publishMessage(t, oneInvalidOffsetTopic) // So that the latest offset is not 0
KubectlApplyWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)
defer KubectlDeleteWithTemplate(t, data, "invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate", invalidOffsetWithEarliestOffsetResetPolicyScaledObjectTemplate)

// Should scale to 1
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
Expand Down Expand Up @@ -568,7 +649,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
t.Log("--- testing persistentLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", persistentLagTopic, persistentLagGroup)
data.Commit = StringTrue
data.TopicName = persistentLagTopic
Expand All @@ -581,7 +662,7 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData
publishMessage(t, persistentLagTopic)
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2),
"replica count should be %d after 2 minute", 1)
// Recreate Deployment to delibrately assign different consumer group to deployment and scaled object
// Recreate Deployment to deliberately assign different consumer group to deployment and scaled object
// This is to simulate inability to consume from topic
// Scaled Object remains unchanged
KubernetesScaleDeployment(t, kc, deploymentName, 0, testNamespace)
Expand Down Expand Up @@ -611,7 +692,7 @@ func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, da
t.Log("--- testing limitToPartitionsWithLag: no scale out ---")

// Simulate Consumption from topic by consumer group
// To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit)
// To avoid edge case where scaling could be effectively disabled (Consumer never makes a commit)
commitPartition(t, limitToPartitionsWithLagTopic, "latest")

data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup)
Expand Down

0 comments on commit a375d17

Please sign in to comment.