Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add ExponentialHistogram support for metrics v4 query range #4525

Merged
merged 3 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ processors:
latency_histogram_buckets: [100us, 1ms, 2ms, 6ms, 10ms, 50ms, 100ms, 250ms, 500ms, 1000ms, 1400ms, 2000ms, 5s, 10s, 20s, 40s, 60s ]
dimensions_cache_size: 100000
aggregation_temporality: AGGREGATION_TEMPORALITY_DELTA
enable_exp_histogram: true
dimensions:
- name: service.namespace
default: default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ processors:
latency_histogram_buckets: [100us, 1ms, 2ms, 6ms, 10ms, 50ms, 100ms, 250ms, 500ms, 1000ms, 1400ms, 2000ms, 5s, 10s, 20s, 40s, 60s ]
dimensions_cache_size: 100000
aggregation_temporality: AGGREGATION_TEMPORALITY_DELTA
enable_exp_histogram: true
dimensions:
- name: service.namespace
default: default
Expand Down
34 changes: 34 additions & 0 deletions pkg/query-service/app/metrics/v4/delta/time_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,40 @@ func TestPrepareTimeseriesQuery(t *testing.T) {
end: 1701796780000,
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
},
{
name: "test time aggregation = rate, space aggregation percentile99, type = ExponentialHistogram",
builderQuery: &v3.BuilderQuery{
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "signoz_latency",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType(v3.MetricTypeExponentialHistogram),
IsColumn: true,
IsJSON: false,
},
Temporality: v3.Delta,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{},
},
GroupBy: []v3.AttributeKey{
{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
},
Expression: "A",
Disabled: false,
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationPercentile99,
},
start: 1701794980000,
end: 1701796780000,
expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, quantilesDDMerge(0.01, 0.990000)(sketch)[1] as value FROM signoz_metrics.distributed_exp_hist INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'signoz_latency' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC",
},
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
}

for _, testCase := range testCases {
Expand Down
24 changes: 23 additions & 1 deletion pkg/query-service/app/metrics/v4/delta/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import (
"go.signoz.io/signoz/pkg/query-service/utils"
)

// TODO(srikanthccv): support multiple quantiles; see https://github.com/SigNoz/signoz/issues/4016#issuecomment-1838583305
var (
sketchFmt = "quantilesDDMerge(0.01, %f)(sketch)[1]"
)

// prepareTimeAggregationSubQuery builds the sub-query to be used for temporal aggregation
func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) {

Expand Down Expand Up @@ -84,12 +89,16 @@ func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string,

samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end)

var tableName string = constants.SIGNOZ_SAMPLES_V4_TABLENAME
if mq.AggregateAttribute.Type == v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) {
tableName = "distributed_exp_hist"
}
// Select the aggregate value for interval
queryTmpl :=
"SELECT %s" +
" toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," +
" %s as value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + tableName +
" INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
Expand All @@ -110,6 +119,13 @@ func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string,
case v3.SpaceAggregationMax:
op := "max(value)"
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
case v3.SpaceAggregationPercentile50,
v3.SpaceAggregationPercentile75,
v3.SpaceAggregationPercentile90,
v3.SpaceAggregationPercentile95,
v3.SpaceAggregationPercentile99:
op := fmt.Sprintf(sketchFmt, v3.GetPercentileFromOperator(mq.SpaceAggregation))
query = fmt.Sprintf(queryTmpl, selectLabels, step, op, timeSeriesSubQuery, groupBy, orderBy)
}
return query, nil
}
Expand Down Expand Up @@ -178,6 +194,9 @@ func PrepareMetricQueryDeltaTimeSeries(start, end, step int64, mq *v3.BuilderQue
// 4. time aggregation = max and space aggregation = max
// - max of maxs is same as max of all values
//
// 5. special case exphist, there is no need for per series/fingerprint aggregation
// we can directly use the quantilesDDMerge function
//
// all of this is true only for delta metrics
func canShortCircuit(mq *v3.BuilderQuery) bool {
if (mq.TimeAggregation == v3.TimeAggregationRate || mq.TimeAggregation == v3.TimeAggregationIncrease) && mq.SpaceAggregation == v3.SpaceAggregationSum {
Expand All @@ -192,5 +211,8 @@ func canShortCircuit(mq *v3.BuilderQuery) bool {
if mq.TimeAggregation == v3.TimeAggregationMax && mq.SpaceAggregation == v3.SpaceAggregationMax {
return true
}
if mq.AggregateAttribute.Type == v3.AttributeKeyType(v3.MetricTypeExponentialHistogram) && v3.IsPercentileOperator(mq.SpaceAggregation) {
return true
}
return false
}
34 changes: 0 additions & 34 deletions pkg/query-service/app/metrics/v4/helpers/clauses.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ func GroupByAttributeKeyTags(tags ...v3.AttributeKey) string {
return strings.Join(groupTags, ", ")
}

func GroupByAttributeKeyTagsWithoutLe(tags ...v3.AttributeKey) string {
groupTags := []string{}
for _, tag := range tags {
if tag.Key != "le" {
groupTags = append(groupTags, tag.Key)
}
}
groupTags = append(groupTags, "ts")
return strings.Join(groupTags, ", ")
}

// OrderByAttributeKeyTags returns a string of comma separated tags for order by clause
// if the order is not specified, it defaults to ASC
func OrderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string {
Expand All @@ -71,29 +60,6 @@ func OrderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string
return strings.Join(orderBy, ", ")
}

func OrderByAttributeKeyTagsWithoutLe(items []v3.OrderBy, tags []v3.AttributeKey) string {
var orderBy []string
for _, tag := range tags {
if tag.Key != "le" {
found := false
for _, item := range items {
if item.ColumnName == tag.Key {
found = true
orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order))
break
}
}
if !found {
orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag.Key))
}
}
}

orderBy = append(orderBy, "ts ASC")

return strings.Join(orderBy, ", ")
}

func SelectLabelsAny(tags []v3.AttributeKey) string {
var selectLabelsAny []string
for _, tag := range tags {
Expand Down
11 changes: 11 additions & 0 deletions pkg/query-service/model/v3/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,17 @@ func (t TimeAggregation) IsRateOperator() bool {
}
}

type MetricType string

const (
MetricTypeUnspecified MetricType = ""
MetricTypeSum MetricType = "Sum"
MetricTypeGauge MetricType = "Gauge"
MetricTypeHistogram MetricType = "Histogram"
MetricTypeSummary MetricType = "Summary"
MetricTypeExponentialHistogram MetricType = "ExponentialHistogram"
)

type SpaceAggregation string

const (
Expand Down
Loading