Skip to content

Commit

Permalink
chore: add prepare query for delta/unspecified table (#4168)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Jan 7, 2024
1 parent 7d960b7 commit 525dea3
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 3 deletions.
49 changes: 49 additions & 0 deletions pkg/query-service/app/metrics/v4/delta/table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package delta

import (
"fmt"

v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)

// prepareMetricQueryDeltaTable builds the query to be used for fetching metrics
func prepareMetricQueryDeltaTable(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
var query string

temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq)
if err != nil {
return "", err
}

groupBy := groupingSetsByAttributeKeyTags(mq.GroupBy...)
orderBy := orderByAttributeKeyTags(mq.OrderBy, mq.GroupBy)
selectLabels := groupByAttributeKeyTags(mq.GroupBy...)

queryTmpl :=
"SELECT %s," +
" %s as value" +
" FROM (%s)" +
" WHERE isNaN(per_series_value) = 0" +
" GROUP BY %s" +
" ORDER BY %s"

switch mq.SpaceAggregation {
case v3.SpaceAggregationAvg:
op := "avg(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
case v3.SpaceAggregationSum:
op := "sum(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMin:
op := "min(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
case v3.SpaceAggregationMax:
op := "max(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
case v3.SpaceAggregationCount:
op := "count(per_series_value)"
query = fmt.Sprintf(queryTmpl, selectLabels, op, temporalAggSubQuery, groupBy, orderBy)
}

return query, nil
}
114 changes: 114 additions & 0 deletions pkg/query-service/app/metrics/v4/delta/table_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package delta

import (
"testing"

"github.com/stretchr/testify/assert"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)

func TestPrepareTableQuery(t *testing.T) {
// The table query is almost the same as the time series query, except that
// each row will be reduced to a single value using the `ReduceTo` aggregation
testCases := []struct {
name string
builderQuery *v3.BuilderQuery
start int64
end int64
expectedQueryContains string
}{
{
name: "test time aggregation = avg, space aggregation = sum, temporality = unspecified",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "system_memory_usage",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyTypeUnspecified,
IsColumn: true,
IsJSON: false,
},
Temporality: v3.Unspecified,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "state",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorNotEqual,
Value: "idle",
},
},
},
GroupBy: []v3.AttributeKey{},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationSum,
},
start: 1701794980000,
end: 1701796780000,
expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC",
},
{
name: "test time aggregation = rate, space aggregation = sum, temporality = delta",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "http_requests",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyTypeUnspecified,
IsColumn: true,
IsJSON: false,
},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorContains,
Value: "payment_service",
},
},
},
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
}},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
},
start: 1701794980000,
end: 1701796780000,
expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := prepareMetricQueryDeltaTable(
testCase.start,
testCase.end,
testCase.builderQuery.StepInterval,
testCase.builderQuery,
)
assert.Nil(t, err)
assert.Contains(t, query, testCase.expectedQueryContains)
})
}
}
2 changes: 1 addition & 1 deletion pkg/query-service/app/metrics/v4/delta/time_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestPrepareTimeAggregationSubQuery(t *testing.T) {

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
query, err := prepareTimeAggregationSubQueryTimeSeries(
query, err := prepareTimeAggregationSubQuery(
testCase.start,
testCase.end,
testCase.builderQuery.StepInterval,
Expand Down
4 changes: 2 additions & 2 deletions pkg/query-service/app/metrics/v4/delta/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// prepareTimeAggregationSubQueryTimeSeries builds the sub-query to be used for temporal aggregation
func prepareTimeAggregationSubQueryTimeSeries(start, end, step int64, mq *v3.BuilderQuery) (string, error) {
func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {

var subQuery string

Expand Down Expand Up @@ -81,7 +81,7 @@ func prepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQue

var query string

temporalAggSubQuery, err := prepareTimeAggregationSubQueryTimeSeries(start, end, step, mq)
temporalAggSubQuery, err := prepareTimeAggregationSubQuery(start, end, step, mq)
if err != nil {
return "", err
}
Expand Down

0 comments on commit 525dea3

Please sign in to comment.