This repository has been archived by the owner on Aug 23, 2023. It is now read-only.
/
func_aggregate.go
120 lines (98 loc) · 2.92 KB
/
func_aggregate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package expr
import (
"math"
"strings"
"unsafe"
"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/schema"
)
type FuncAggregate struct {
in []GraphiteFunc
name string
xFilesFactor float64
}
// NewAggregateConstructor takes an agg string and returns a constructor function
func NewAggregateConstructor(name string) func() GraphiteFunc {
return func() GraphiteFunc {
return &FuncAggregate{name: name}
}
}
func NewAggregate() GraphiteFunc {
return &FuncAggregate{}
}
func (s *FuncAggregate) Signature() ([]Arg, []Arg) {
if s.name == "" {
return []Arg{
ArgSeriesLists{val: &s.in},
ArgString{val: &s.name, validator: []Validator{IsAggFunc}, key: "func"},
ArgFloat{val: &s.xFilesFactor, opt: true, key: "xFilesFactor"},
}, []Arg{ArgSeries{}}
} else {
return []Arg{
ArgSeriesLists{val: &s.in},
}, []Arg{ArgSeries{}}
}
}
func (s *FuncAggregate) Context(context Context) Context {
context.PNGroup = models.PNGroup(uintptr(unsafe.Pointer(s)))
return context
}
func (s *FuncAggregate) Exec(dataMap DataMap) ([]models.Series, error) {
series, queryPatts, err := consumeFuncs(dataMap, s.in)
if err != nil {
return nil, err
}
if len(series) == 0 {
return series, nil
}
agg := seriesAggregator{function: getCrossSeriesAggFunc(s.name), name: s.name}
series = Normalize(dataMap, series)
return aggregate(dataMap, series, queryPatts, agg, s.xFilesFactor)
}
// aggregate aggregates series using the requested aggregator and xFilesFactor and returns an output slice of length 1.
func aggregate(dataMap DataMap, series []models.Series, queryPatts []string, agg seriesAggregator, xFilesFactor float64) ([]models.Series, error) {
if len(series) == 1 {
name := agg.name + "Series(" + series[0].QueryPatt + ")"
series[0].Target = name
series[0].QueryPatt = name
return series, nil
}
out := pointSlicePool.Get().([]schema.Point)
agg.function(series, &out)
//remove values in accordance to xFilesFactor
if !skipCrossSeriesXff(xFilesFactor) {
for i := 0; i < len(series[0].Datapoints); i++ {
if !crossSeriesXff(series, i, xFilesFactor) {
out[i].Val = math.NaN()
}
}
}
// The tags for the aggregated series is only the tags that are
// common to all input series
commonTags := series[0].CopyTags()
var meta models.SeriesMeta
for _, serie := range series {
meta = meta.Merge(serie.Meta)
for k, v := range serie.Tags {
if commonTags[k] != v {
delete(commonTags, k)
}
}
}
cons, queryCons := summarizeCons(series)
name := agg.name + "Series(" + strings.Join(queryPatts, ",") + ")"
commonTags["aggregatedBy"] = agg.name
if _, ok := commonTags["name"]; !ok {
commonTags["name"] = name
}
output := series[0]
output.Target = name
output.QueryPatt = name
output.Tags = commonTags
output.Datapoints = out
output.QueryCons = queryCons
output.Consolidator = cons
output.Meta = meta
dataMap.Add(Req{}, output)
return []models.Series{output}, nil
}