Skip to content

Commit

Permalink
Add integration test for keep_firing_for field
Browse files Browse the repository at this point in the history
Signed-off-by: Mustafain Ali Khan <mustalik@amazon.com>
  • Loading branch information
mustafain117 committed Mar 21, 2024
1 parent 7140b26 commit bb37d29
Showing 1 changed file with 166 additions and 4 deletions.
170 changes: 166 additions & 4 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"crypto/x509"
"crypto/x509/pkix"
"encoding/json"
"fmt"
"math"
"math/rand"
Expand All @@ -19,10 +20,7 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ruler"

"github.com/cortexproject/cortex/pkg/storage/tsdb"

v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
Expand All @@ -37,6 +35,8 @@ import (
"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
)

func TestRulerAPI(t *testing.T) {
Expand Down Expand Up @@ -1038,6 +1038,168 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
})
}

func TestRulerKeepFiring(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
flags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
map[string]string{
// Since we're not going to run any rule (our only rule is invalid), we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
// Evaluate rules often, so that we don't need to wait for metrics to show up.
"-ruler.evaluation-interval": "2s",
"-ruler.poll-interval": "2s",
// No delay
"-ruler.evaluation-delay-duration": "0",

"-blocks-storage.tsdb.block-ranges-period": "1h",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": "2h",

// We run single ingester only, no replication.
"-distributor.replication-factor": "1",

"-querier.max-fetched-chunks-per-query": "50",
},
)

const namespace = "test"
const user = "user"

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler))

// Wait until both the distributor and ruler have updated the ring. The querier will also watch
// the store-gateway ring if blocks sharding is enabled.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

expression := "vector(1) > 0" // Alert will fire
groupName := "rule_group_1"
ruleName := "rule_keep_firing"

require.NoError(t, c.SetRuleGroup(alertRuleWithKeepFiringFor(groupName, ruleName, expression, model.Duration(10*time.Second)), namespace))

m := ruleGroupMatcher(user, namespace, groupName)

// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

groups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
RuleNames: []string{ruleName},
})
require.NoError(t, err)
require.NotEmpty(t, groups)
require.Equal(t, 1, len(groups[0].Rules))
alert := parseAlertFromRule(t, groups[0].Rules[0])
require.Equal(t, float64(10), alert.KeepFiringFor)
require.Equal(t, 1, len(alert.Alerts))
require.Empty(t, alert.Alerts[0].KeepFiringSince) //Alert expression not resolved, keepFiringSince should be empty

expression = "vector(1) > 1" // Resolve, should keep firing for set duration
ts := time.Now()
require.NoError(t, c.SetRuleGroup(alertRuleWithKeepFiringFor(groupName, ruleName, expression, model.Duration(10*time.Second)), namespace))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(2), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

updatedGroups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
RuleNames: []string{ruleName},
})
require.NoError(t, err)
require.NotEmpty(t, updatedGroups)
require.Equal(t, 1, len(updatedGroups[0].Rules))

alert = parseAlertFromRule(t, updatedGroups[0].Rules[0])
require.Equal(t, "firing", alert.State)
require.Equal(t, float64(10), alert.KeepFiringFor)
require.Equal(t, 1, len(alert.Alerts))
require.NotEmpty(t, alert.Alerts[0].KeepFiringSince)
require.Greater(t, alert.Alerts[0].KeepFiringSince.UnixNano(), ts.UnixNano(), "KeepFiringSince value should be after expression is resolved")

time.Sleep(10 * time.Second) // Sleep beyond keepFiringFor time
updatedGroups, err = c.GetPrometheusRules(e2ecortex.RuleFilter{
RuleNames: []string{ruleName},
})
require.NoError(t, err)
require.NotEmpty(t, updatedGroups)
require.Equal(t, 1, len(updatedGroups[0].Rules))
alert = parseAlertFromRule(t, updatedGroups[0].Rules[0])
require.Equal(t, 0, len(alert.Alerts)) // alert should be resolved once keepFiringFor time expires
}

func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule {
responseJson, err := json.Marshal(rules)
require.NoError(t, err)

alertResp := &alertingRule{}
require.NoError(t, json.Unmarshal(responseJson, alertResp))
return alertResp
}

type alertingRule struct {
// State can be "pending", "firing", "inactive".
State string `json:"state"`
Name string `json:"name"`
Query string `json:"query"`
Duration float64 `json:"duration"`
KeepFiringFor float64 `json:"keepFiringFor"`
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
Alerts []*Alert `json:"alerts"`
Health string `json:"health"`
LastError string `json:"lastError"`
Type v1.RuleType `json:"type"`
LastEvaluation time.Time `json:"lastEvaluation"`
EvaluationTime float64 `json:"evaluationTime"`
}

// Alert has info for an alert.
type Alert struct {
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
State string `json:"state"`
ActiveAt *time.Time `json:"activeAt"`
KeepFiringSince *time.Time `json:"keepFiringSince,omitempty"`
Value string `json:"value"`
}

func alertRuleWithKeepFiringFor(groupName string, ruleName string, expression string, keepFiring model.Duration) rulefmt.RuleGroup {
var recordNode = yaml.Node{}
var exprNode = yaml.Node{}

recordNode.SetString(ruleName)
exprNode.SetString(expression)

return rulefmt.RuleGroup{
Name: groupName,
Interval: 10,
Rules: []rulefmt.RuleNode{{
Alert: recordNode,
Expr: exprNode,
KeepFiringFor: keepFiring,
}},
}
}

func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName))
}
Expand Down

0 comments on commit bb37d29

Please sign in to comment.