diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index e7a482d02f..59e76c0ec7 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -213,6 +213,7 @@ const ( SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4" SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs" SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day" + SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME = "distributed_time_series_v4_1day" ) var TimeoutExcludedRoutes = map[string]bool{ diff --git a/pkg/query-service/rules/apiParams.go b/pkg/query-service/rules/apiParams.go index 6000ec280f..0ccf885b3d 100644 --- a/pkg/query-service/rules/apiParams.go +++ b/pkg/query-service/rules/apiParams.go @@ -50,6 +50,8 @@ type PostableRule struct { PreferredChannels []string `json:"preferredChannels,omitempty"` + Version string `json:"version,omitempty"` + // legacy Expr string `yaml:"expr,omitempty" json:"expr,omitempty"` OldYaml string `json:"yaml,omitempty"` diff --git a/pkg/query-service/rules/thresholdRule.go b/pkg/query-service/rules/thresholdRule.go index 9687038a40..0aeef44aee 100644 --- a/pkg/query-service/rules/thresholdRule.go +++ b/pkg/query-service/rules/thresholdRule.go @@ -18,6 +18,7 @@ import ( "go.uber.org/zap" "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "go.signoz.io/signoz/pkg/query-service/converter" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" @@ -31,6 +32,7 @@ import ( logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" + metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/formatter" @@ -59,7 +61,14 @@ type ThresholdRule struct { // map of active alerts active map[uint64]*Alert - queryBuilder *queryBuilder.QueryBuilder + queryBuilder *queryBuilder.QueryBuilder + version string + queryBuilderV4 *queryBuilder.QueryBuilder + // temporalityMap is a map of metric name to temporality + // to avoid fetching temporality for the same metric multiple times + // querying the v4 table on low cardinal temporality column + // should be fast but we can still avoid the query if we have the data in memory + temporalityMap map[string]map[v3.Temporality]bool opts ThresholdRuleOpts typ string @@ -102,6 +111,8 @@ func NewThresholdRule( active: map[uint64]*Alert{}, opts: opts, typ: p.AlertType, + version: p.Version, + temporalityMap: make(map[string]map[v3.Temporality]bool), } if int64(t.evalWindow) == 0 { @@ -115,6 +126,13 @@ func NewThresholdRule( } t.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, featureFlags) + builderOptsV4 := queryBuilder.QueryBuilderOptions{ + BuildMetricQuery: metricsV4.PrepareMetricQuery, + BuildTraceQuery: tracesV3.PrepareTracesQuery, + BuildLogQuery: logsv3.PrepareLogsQuery, + } + t.queryBuilderV4 = queryBuilder.NewQueryBuilder(builderOptsV4, featureFlags) + zap.S().Info("msg:", "creating new alerting rule", "\t name:", t.name, "\t condition:", t.ruleCondition.String(), "\t generatorURL:", t.GeneratorURL()) return &t, nil @@ -274,6 +292,87 @@ func (r *ThresholdRule) ActiveAlerts() []*Alert { return res } +func (r *ThresholdRule) FetchTemporality(ctx context.Context, metricNames []string, ch driver.Conn) (map[string]map[v3.Temporality]bool, error) { + + metricNameToTemporality := make(map[string]map[v3.Temporality]bool) + + query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME) + + rows, err := ch.Query(ctx, query, metricNames) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var metricName, temporality string + err := rows.Scan(&metricName, &temporality) + if err != nil { + return nil, err + } + if _, ok := metricNameToTemporality[metricName]; !ok { + metricNameToTemporality[metricName] = make(map[v3.Temporality]bool) + } + metricNameToTemporality[metricName][v3.Temporality(temporality)] = true + } + fmt.Println("metricNameToTemporality", metricNameToTemporality) + return metricNameToTemporality, nil +} + +// populateTemporality same as addTemporality but for v4 and better +func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3, ch driver.Conn) error { + + fmt.Println("populate", r.temporalityMap) + missingTemporality := make([]string, 0) + metricNameToTemporality := make(map[string]map[v3.Temporality]bool) + if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { + for _, query := range qp.CompositeQuery.BuilderQueries { + // if there is no temporality specified in the query but we have it in the map + // then use the value from the map + if query.Temporality == "" && r.temporalityMap[query.AggregateAttribute.Key] != nil { + // We prefer delta if it is available + if r.temporalityMap[query.AggregateAttribute.Key][v3.Delta] { + query.Temporality = v3.Delta + } else if r.temporalityMap[query.AggregateAttribute.Key][v3.Cumulative] { + query.Temporality = v3.Cumulative + } else { + query.Temporality = v3.Unspecified + } + } + // we don't have temporality for this metric + if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" { + missingTemporality = append(missingTemporality, query.AggregateAttribute.Key) + } + if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok { + metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool) + } + } + } + + nameToTemporality, err := r.FetchTemporality(ctx, missingTemporality, ch) + fmt.Println("nameToTemporality", nameToTemporality, err) + if err != nil { + return err + } + + if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { + for name := range qp.CompositeQuery.BuilderQueries { + query := qp.CompositeQuery.BuilderQueries[name] + if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" { + if nameToTemporality[query.AggregateAttribute.Key][v3.Delta] { + query.Temporality = v3.Delta + } else if nameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] { + query.Temporality = v3.Cumulative + } else { + query.Temporality = v3.Unspecified + } + r.temporalityMap[query.AggregateAttribute.Key] = nameToTemporality[query.AggregateAttribute.Key] + } + } + } + return nil +} + // ForEachActiveAlert runs the given function on each alert. // This should be used when you want to use the actual alerts from the ThresholdRule // and not on its copy. @@ -626,7 +725,7 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer return result, nil } -func (r *ThresholdRule) prepareBuilderQueries(ts time.Time) (map[string]string, error) { +func (r *ThresholdRule) prepareBuilderQueries(ts time.Time, ch driver.Conn) (map[string]string, error) { params := r.prepareQueryRange(ts) if params.CompositeQuery.QueryType == v3.QueryTypeBuilder { // check if any enrichment is required for logs if yes then enrich them @@ -638,7 +737,20 @@ func (r *ThresholdRule) prepareBuilderQueries(ts time.Time) (map[string]string, } - runQueries, err := r.queryBuilder.PrepareQueries(params) + fmt.Println("here", ch) + + if ch != nil { + r.populateTemporality(context.Background(), params, ch) + } + + var runQueries map[string]string + var err error + + if r.version == "v4" { + runQueries, err = r.queryBuilderV4.PrepareQueries(params) + } else { + runQueries, err = r.queryBuilder.PrepareQueries(params) + } return runQueries, err } @@ -896,7 +1008,7 @@ func (r *ThresholdRule) GetSelectedQuery() string { var err error if r.ruleCondition.QueryType() == v3.QueryTypeBuilder { - queries, err = r.prepareBuilderQueries(time.Now()) + queries, err = r.prepareBuilderQueries(time.Now(), nil) if err != nil { zap.S().Errorf("ruleid:", r.ID(), "\t msg: failed to prepare metric queries", zap.Error(err)) return "" @@ -950,7 +1062,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch c // fetch the target query based on query type if r.ruleCondition.QueryType() == v3.QueryTypeBuilder { - queries, err = r.prepareBuilderQueries(ts) + queries, err = r.prepareBuilderQueries(ts, ch) if err != nil { zap.S().Errorf("ruleid:", r.ID(), "\t msg: failed to prepare metric queries", zap.Error(err))