Skip to content

Commit

Permalink
chore: add v4 support for rules
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Feb 27, 2024
1 parent 1eb7f3e commit 8da74c4
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/query-service/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ const (
SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4"
SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs"
SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day"
SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME = "distributed_time_series_v4_1day"
)

var TimeoutExcludedRoutes = map[string]bool{
Expand Down
2 changes: 2 additions & 0 deletions pkg/query-service/rules/apiParams.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type PostableRule struct {

PreferredChannels []string `json:"preferredChannels,omitempty"`

Version string `json:"version,omitempty"`

// legacy
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"`
OldYaml string `json:"yaml,omitempty"`
Expand Down
122 changes: 117 additions & 5 deletions pkg/query-service/rules/thresholdRule.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/zap"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.signoz.io/signoz/pkg/query-service/converter"

"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
Expand All @@ -31,6 +32,7 @@ import (

logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/formatter"

Expand Down Expand Up @@ -59,7 +61,14 @@ type ThresholdRule struct {
// map of active alerts
active map[uint64]*Alert

queryBuilder *queryBuilder.QueryBuilder
queryBuilder *queryBuilder.QueryBuilder
version string
queryBuilderV4 *queryBuilder.QueryBuilder
// temporalityMap is a map of metric name to temporality
// to avoid fetching temporality for the same metric multiple times
// querying the v4 table on low cardinal temporality column
// should be fast but we can still avoid the query if we have the data in memory
temporalityMap map[string]map[v3.Temporality]bool

opts ThresholdRuleOpts
typ string
Expand Down Expand Up @@ -102,6 +111,8 @@ func NewThresholdRule(
active: map[uint64]*Alert{},
opts: opts,
typ: p.AlertType,
version: p.Version,
temporalityMap: make(map[string]map[v3.Temporality]bool),
}

if int64(t.evalWindow) == 0 {
Expand All @@ -115,6 +126,13 @@ func NewThresholdRule(
}
t.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, featureFlags)

builderOptsV4 := queryBuilder.QueryBuilderOptions{
BuildMetricQuery: metricsV4.PrepareMetricQuery,
BuildTraceQuery: tracesV3.PrepareTracesQuery,
BuildLogQuery: logsv3.PrepareLogsQuery,
}
t.queryBuilderV4 = queryBuilder.NewQueryBuilder(builderOptsV4, featureFlags)

zap.S().Info("msg:", "creating new alerting rule", "\t name:", t.name, "\t condition:", t.ruleCondition.String(), "\t generatorURL:", t.GeneratorURL())

return &t, nil
Expand Down Expand Up @@ -274,6 +292,87 @@ func (r *ThresholdRule) ActiveAlerts() []*Alert {
return res
}

func (r *ThresholdRule) FetchTemporality(ctx context.Context, metricNames []string, ch driver.Conn) (map[string]map[v3.Temporality]bool, error) {

metricNameToTemporality := make(map[string]map[v3.Temporality]bool)

query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME)

rows, err := ch.Query(ctx, query, metricNames)
if err != nil {
return nil, err
}
defer rows.Close()

for rows.Next() {
var metricName, temporality string
err := rows.Scan(&metricName, &temporality)
if err != nil {
return nil, err
}
if _, ok := metricNameToTemporality[metricName]; !ok {
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
}
metricNameToTemporality[metricName][v3.Temporality(temporality)] = true
}
fmt.Println("metricNameToTemporality", metricNameToTemporality)
return metricNameToTemporality, nil
}

// populateTemporality same as addTemporality but for v4 and better
func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3, ch driver.Conn) error {

fmt.Println("populate", r.temporalityMap)
missingTemporality := make([]string, 0)
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for _, query := range qp.CompositeQuery.BuilderQueries {
// if there is no temporality specified in the query but we have it in the map
// then use the value from the map
if query.Temporality == "" && r.temporalityMap[query.AggregateAttribute.Key] != nil {
// We prefer delta if it is available
if r.temporalityMap[query.AggregateAttribute.Key][v3.Delta] {
query.Temporality = v3.Delta
} else if r.temporalityMap[query.AggregateAttribute.Key][v3.Cumulative] {
query.Temporality = v3.Cumulative
} else {
query.Temporality = v3.Unspecified
}
}
// we don't have temporality for this metric
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
missingTemporality = append(missingTemporality, query.AggregateAttribute.Key)
}
if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok {
metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool)
}
}
}

nameToTemporality, err := r.FetchTemporality(ctx, missingTemporality, ch)
fmt.Println("nameToTemporality", nameToTemporality, err)
if err != nil {
return err
}

if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for name := range qp.CompositeQuery.BuilderQueries {
query := qp.CompositeQuery.BuilderQueries[name]
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
if nameToTemporality[query.AggregateAttribute.Key][v3.Delta] {
query.Temporality = v3.Delta
} else if nameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] {
query.Temporality = v3.Cumulative
} else {
query.Temporality = v3.Unspecified
}
r.temporalityMap[query.AggregateAttribute.Key] = nameToTemporality[query.AggregateAttribute.Key]
}
}
}
return nil
}

// ForEachActiveAlert runs the given function on each alert.
// This should be used when you want to use the actual alerts from the ThresholdRule
// and not on its copy.
Expand Down Expand Up @@ -626,7 +725,7 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer
return result, nil
}

func (r *ThresholdRule) prepareBuilderQueries(ts time.Time) (map[string]string, error) {
func (r *ThresholdRule) prepareBuilderQueries(ts time.Time, ch driver.Conn) (map[string]string, error) {
params := r.prepareQueryRange(ts)
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
// check if any enrichment is required for logs if yes then enrich them
Expand All @@ -638,7 +737,20 @@ func (r *ThresholdRule) prepareBuilderQueries(ts time.Time) (map[string]string,

}

runQueries, err := r.queryBuilder.PrepareQueries(params)
fmt.Println("here", ch)

if ch != nil {
r.populateTemporality(context.Background(), params, ch)
}

var runQueries map[string]string
var err error

if r.version == "v4" {
runQueries, err = r.queryBuilderV4.PrepareQueries(params)
} else {
runQueries, err = r.queryBuilder.PrepareQueries(params)
}

return runQueries, err
}
Expand Down Expand Up @@ -896,7 +1008,7 @@ func (r *ThresholdRule) GetSelectedQuery() string {
var err error

if r.ruleCondition.QueryType() == v3.QueryTypeBuilder {
queries, err = r.prepareBuilderQueries(time.Now())
queries, err = r.prepareBuilderQueries(time.Now(), nil)
if err != nil {
zap.S().Errorf("ruleid:", r.ID(), "\t msg: failed to prepare metric queries", zap.Error(err))
return ""
Expand Down Expand Up @@ -950,7 +1062,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch c
// fetch the target query based on query type
if r.ruleCondition.QueryType() == v3.QueryTypeBuilder {

queries, err = r.prepareBuilderQueries(ts)
queries, err = r.prepareBuilderQueries(ts, ch)

if err != nil {
zap.S().Errorf("ruleid:", r.ID(), "\t msg: failed to prepare metric queries", zap.Error(err))
Expand Down

0 comments on commit 8da74c4

Please sign in to comment.