Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add conditional read-after–write support to rules evaluation #7142

Merged
merged 12 commits into from
Jan 22, 2024
Merged
11 changes: 10 additions & 1 deletion development/mimir-ingest-storage/config/example-rules.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
# Example rules file to load to Mimir via the ruler API.
groups:
- name: alerts
# Frequently evaluate rules.
interval: 10s
rules:
# The following recording rule is independent.
- record: cortex_build_info:sum
expr: sum(cortex_build_info)
# The following recording rule is used by the AlwaysFiring alert.
- record: up:count
expr: count(up)
- alert: AlwaysFiring
expr: count(up) >= 0
expr: up:count >= 0
for: 10s
labels:
severity: page
annotations:
Expand Down
5 changes: 5 additions & 0 deletions development/mimir-ingest-storage/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ ruler:
rule_path: /data/ruler
# Each ruler is configured to route alerts to the Alertmanager running within the same component.
alertmanager_url: http://localhost:8080/alertmanager
# Force the ruler to restore the state of any alert with a "for" period longer than 1s.
for_grace_period: 1s
# Evaluate rules via query-frontend (remote rule evaluation).
query_frontend:
address: dns:///mimir-read-1:9095

ruler_storage:
s3:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ require (
)

// Using a fork of Prometheus with Mimir-specific changes.
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240116133529-e3a486e7f801
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240122165117-baa5d82b5bc8

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet:
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,8 @@ github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPft
github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20240116133529-e3a486e7f801 h1:65eoE+Cwgi8PS+TBmdBn3xtS/JFeuTImzQI4GNDrhTQ=
github.com/grafana/mimir-prometheus v0.0.0-20240116133529-e3a486e7f801/go.mod h1:W4s/zaz2ypTeyg7h7HDJ4/g0+p5tXBWJ6ToK3g0a5zs=
github.com/grafana/mimir-prometheus v0.0.0-20240122165117-baa5d82b5bc8 h1:nicadoSO2KafJRExlss8+PZkgH5OaCAujWSUV7EIB7E=
github.com/grafana/mimir-prometheus v0.0.0-20240122165117-baa5d82b5bc8/go.mod h1:W4s/zaz2ypTeyg7h7HDJ4/g0+p5tXBWJ6ToK3g0a5zs=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/grafana/pyroscope-go/godeltaprof v0.1.6 h1:nEdZ8louGAplSvIJi1HVp7kWvFvdiiYg3COLlTwJiFo=
Expand Down
25 changes: 18 additions & 7 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,18 +394,21 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {

wg.Wait()

extra := float64(2)
// Compute the expected number of queries.
expectedQueriesCount := float64(numUsers*numQueriesPerUser) + 2
expectedIngesterQueriesCount := float64(numUsers * numQueriesPerUser) // The "time()" query and the query with time range < "query ingesters within" are not pushed down to ingesters.
if cfg.queryStatsEnabled {
extra++
expectedQueriesCount++
expectedIngesterQueriesCount++
}

require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser+extra), "cortex_query_frontend_queries_total"))
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(expectedQueriesCount), "cortex_query_frontend_queries_total"))

// The number of received request is greater then the query requests because include
// The number of received requests may be greater than the query requests because include
// requests to /metrics and /ready.
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(expectedQueriesCount), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(expectedQueriesCount), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(expectedQueriesCount), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount))

// Ensure query stats metrics are tracked only when enabled.
if cfg.queryStatsEnabled {
Expand All @@ -417,6 +420,14 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
require.NoError(t, queryFrontend.WaitRemovedMetric("cortex_query_seconds_total"))
}

// When the ingest storage is used, we expect that each query issued by this test was processed
// with strong read consistency by the ingester.
if flags["-ingest-storage.enabled"] == "true" {
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(expectedIngesterQueriesCount), "cortex_ingest_storage_strong_consistency_requests_total"))
} else {
require.NoError(t, ingester.WaitRemovedMetric("cortex_ingest_storage_strong_consistency_requests_total"))
}

// Ensure no service-specific metrics prefix is used by the wrong service.
assertServiceMetricsPrefixes(t, Distributor, distributor)
assertServiceMetricsPrefixes(t, Ingester, ingester)
Expand Down
193 changes: 153 additions & 40 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ import (

"github.com/grafana/mimir/integration/ca"
"github.com/grafana/mimir/integration/e2emimir"
"github.com/grafana/mimir/pkg/querier/api"
)

func TestRulerAPI(t *testing.T) {
var (
namespaceOne = "test_/encoded_+namespace/?"
namespaceTwo = "test_/encoded_+namespace/?/two"
ruleGroup = createTestRuleGroup(t)
ruleGroup = createTestRuleGroup()
)

s, err := e2e.NewScenario(networkName)
Expand Down Expand Up @@ -404,7 +405,7 @@ func TestRulerSharding(t *testing.T) {

func TestRulerAlertmanager(t *testing.T) {
var namespaceOne = "test_/encoded_+namespace/?"
ruleGroup := createTestRuleGroup(t)
ruleGroup := createTestRuleGroup()

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -458,7 +459,7 @@ func TestRulerAlertmanager(t *testing.T) {

func TestRulerAlertmanagerTLS(t *testing.T) {
var namespaceOne = "test_/encoded_+namespace/?"
ruleGroup := createTestRuleGroup(t)
ruleGroup := createTestRuleGroup()

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -1094,6 +1095,128 @@ func TestRulerRemoteEvaluation(t *testing.T) {
}
}

func TestRulerRemoteEvaluation_ShouldEnforceStrongReadConsistencyForDependentRulesWhenUsingTheIngestStorage(t *testing.T) {
const (
ruleGroupNamespace = "test"
ruleGroupName = "test"
)

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
t.Cleanup(s.Close)

flags := mergeFlags(
CommonStorageBackendFlags(),
RulerFlags(),
BlocksStorageFlags(),
IngestStorageFlags(),
map[string]string{
"-ingester.ring.replication-factor": "1",

// No strong read consistency by default for this test. We want the ruler to enforce the strong
// consistency when required.
"-ingest-storage.read-consistency": api.ReadConsistencyEventual,
},
)

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, mimirBucketName)
kafka := e2edb.NewKafka()
require.NoError(t, s.StartAndWaitReady(minio, consul, kafka))

// Start the query-frontend.
queryFrontend := e2emimir.NewQueryFrontend("query-frontend", flags)
require.NoError(t, s.Start(queryFrontend))
flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint()

// Use query-frontend for rule evaluation.
flags["-ruler.query-frontend.address"] = fmt.Sprintf("dns:///%s", queryFrontend.NetworkGRPCEndpoint())

// Start up services
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester-0", consul.NetworkHTTPEndpoint(), flags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)

require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))
require.NoError(t, s.WaitReady(queryFrontend))

// Wait until the distributor is ready.
// The distributor should have 512 tokens for the ingester ring and 1 for the distributor ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512+1), "cortex_ring_tokens_total"))

client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

// Push a test series.
now := time.Now()
series, _, _ := generateFloatSeries("series_1", now)

res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

t.Run("evaluation of independent rules should not require strong consistency", func(t *testing.T) {
ruler := e2emimir.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(ruler))
t.Cleanup(func() {
require.NoError(t, s.Stop(ruler))
})

rulerClient, err := e2emimir.NewClient("", "", "", ruler.HTTPEndpoint(), userID)
require.NoError(t, err)

// Create a rule group containing 2 independent rules.
group := ruleGroupWithRules(ruleGroupName, time.Second,
recordingRule("series_1:count", "count(series_1)"),
recordingRule("series_1:sum", "sum(series_1)"),
)
require.NoError(t, rulerClient.SetRuleGroup(group, ruleGroupNamespace))

// Cleanup the ruler config when the test will end, so that it doesn't interfere with other test cases.
t.Cleanup(func() {
require.NoError(t, rulerClient.DeleteRuleNamespace(ruleGroupNamespace))
})

// Wait until the rules are evaluated.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(float64(len(group.Rules))), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WaitMissingMetrics))

// The rules have been evaluated at least once. We expect the rule queries
// have run with eventual consistency because they are independent.
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(0), "cortex_ingest_storage_strong_consistency_requests_total"))
})

t.Run("evaluation of dependent rules should require strong consistency", func(t *testing.T) {
ruler := e2emimir.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(ruler))
t.Cleanup(func() {
require.NoError(t, s.Stop(ruler))
})

rulerClient, err := e2emimir.NewClient("", "", "", ruler.HTTPEndpoint(), userID)
require.NoError(t, err)

// Create a rule group containing 2 rules: the 2nd one depends on the 1st one.
group := ruleGroupWithRules(ruleGroupName, time.Second,
recordingRule("series_1:count", "count(series_1)"),
recordingRule("series_1:count:sum", "sum(series_1:count)"),
)
require.NoError(t, rulerClient.SetRuleGroup(group, ruleGroupNamespace))

// Cleanup the ruler config when the test will end, so that it doesn't interfere with other test cases.
t.Cleanup(func() {
require.NoError(t, rulerClient.DeleteRuleNamespace(ruleGroupNamespace))
})

// Wait until the rules are evaluated.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(float64(len(group.Rules))), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WaitMissingMetrics))

// The rules have been evaluated at least once. We expect the 2nd rule query
// has run with strong consistency because it depends on the 1st one.
require.NoError(t, ingester.WaitSumMetrics(e2e.GreaterOrEqual(1), "cortex_ingest_storage_strong_consistency_requests_total"))
})
}

func TestRuler_RestoreWithLongForPeriod(t *testing.T) {
const (
forGracePeriod = 5 * time.Second
Expand Down Expand Up @@ -1304,58 +1427,48 @@ func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
}

func ruleGroupWithRecordingRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup {
var recordNode = yaml.Node{}
var exprNode = yaml.Node{}
return ruleGroupWithRules(groupName, 10, recordingRule(ruleName, expression))
}

recordNode.SetString(ruleName)
exprNode.SetString(expression)
func ruleGroupWithAlertingRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup {
return ruleGroupWithRules(groupName, 10, alertingRule(ruleName, expression))
}

func ruleGroupWithRules(groupName string, interval time.Duration, rules ...rulefmt.RuleNode) rulefmt.RuleGroup {
return rulefmt.RuleGroup{
Name: groupName,
Interval: 10,
Rules: []rulefmt.RuleNode{{
Record: recordNode,
Expr: exprNode,
}},
Interval: model.Duration(interval),
Rules: rules,
}
}

func ruleGroupWithAlertingRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup {
func createTestRuleGroup() rulefmt.RuleGroup {
return ruleGroupWithRules("test_encoded_+\"+group_name/?", 100, recordingRule("test_rule", "up"))
}

func recordingRule(record, expr string) rulefmt.RuleNode {
var recordNode = yaml.Node{}
var exprNode = yaml.Node{}

recordNode.SetString(ruleName)
exprNode.SetString(expression)
recordNode.SetString(record)
exprNode.SetString(expr)

return rulefmt.RuleGroup{
Name: groupName,
Interval: 10,
Rules: []rulefmt.RuleNode{{
Alert: recordNode,
Expr: exprNode,
For: 30,
}},
return rulefmt.RuleNode{
Record: recordNode,
Expr: exprNode,
}
}

func createTestRuleGroup(t *testing.T) rulefmt.RuleGroup {
t.Helper()
func alertingRule(alert, expr string) rulefmt.RuleNode {
var alertNode = yaml.Node{}
var exprNode = yaml.Node{}

var (
recordNode = yaml.Node{}
exprNode = yaml.Node{}
)
alertNode.SetString(alert)
exprNode.SetString(expr)

recordNode.SetString("test_rule")
exprNode.SetString("up")
return rulefmt.RuleGroup{
Name: "test_encoded_+\"+group_name/?",
Interval: 100,
Rules: []rulefmt.RuleNode{
{
Record: recordNode,
Expr: exprNode,
},
},
return rulefmt.RuleNode{
Alert: alertNode,
Expr: exprNode,
For: 30,
}
}
12 changes: 8 additions & 4 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.
func DefaultTenantManagerFactory(
cfg Config,
p Pusher,
embeddedQueryable storage.Queryable,
queryable storage.Queryable,
queryFunc rules.QueryFunc,
overrides RulesLimits,
reg prometheus.Registerer,
Expand Down Expand Up @@ -304,14 +304,18 @@ func DefaultTenantManagerFactory(
queryTime = rulerQuerySeconds.WithLabelValues(userID)
zeroFetchedSeriesCount = zeroFetchedSeriesQueries.WithLabelValues(userID)
}
var wrappedQueryFunc rules.QueryFunc

wrappedQueryFunc = MetricsQueryFunc(queryFunc, totalQueries, failedQueries)
// Wrap the query function with our custom logic.
wrappedQueryFunc := WrapQueryFuncWithReadConsistency(queryFunc)
wrappedQueryFunc = MetricsQueryFunc(wrappedQueryFunc, totalQueries, failedQueries)
wrappedQueryFunc = RecordAndReportRuleQueryMetrics(wrappedQueryFunc, queryTime, zeroFetchedSeriesCount, logger)

// Wrap the queryable with our custom logic.
wrappedQueryable := WrapQueryableWithReadConsistency(queryable)

return rules.NewManager(&rules.ManagerOptions{
Appendable: NewPusherAppendable(p, userID, totalWrites, failedWrites),
Queryable: embeddedQueryable,
Queryable: wrappedQueryable,
QueryFunc: wrappedQueryFunc,
Context: user.InjectOrgID(ctx, userID),
GroupEvaluationContextFunc: FederatedGroupContextFunc,
Expand Down
Loading
Loading