This repository has been archived by the owner on Aug 23, 2023. It is now read-only.
/
normalize.go
113 lines (98 loc) · 4.91 KB
/
normalize.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package expr
import (
"fmt"
"math"
"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/consolidation"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/util"
"github.com/grafana/metrictank/util/align"
)
// Normalize normalizes series to the same common LCM interval - if they don't already have the same interval
// any adjusted series gets created in a series drawn out of the pool and is added to the dataMap so it can be reclaimed
func Normalize(dataMap DataMap, in []models.Series) []models.Series {
if len(in) < 2 {
return in
}
var intervals []uint32
for _, s := range in {
if s.Interval == 0 {
panic("illegal interval 0")
}
intervals = append(intervals, s.Interval)
}
lcm := util.Lcm(intervals)
for i, s := range in {
if s.Interval != lcm {
in[i] = NormalizeTo(dataMap, s, lcm)
}
}
return in
}
func NormalizeTwo(dataMap DataMap, a, b models.Series) (models.Series, models.Series) {
if a.Interval == b.Interval {
return a, b
}
intervals := []uint32{a.Interval, b.Interval}
lcm := util.Lcm(intervals)
if a.Interval != lcm {
a = NormalizeTo(dataMap, a, lcm)
}
if b.Interval != lcm {
b = NormalizeTo(dataMap, b, lcm)
}
return a, b
}
// NormalizeTo normalizes the given series to the desired interval
// will pad front and strip from back as needed, to assure the output is canonical for the given interval
// the following MUST be true when calling this:
// * interval > in.Interval
// * interval % in.Interval == 0
func NormalizeTo(dataMap DataMap, in models.Series, interval uint32) models.Series {
if len(in.Datapoints) == 0 {
panic(fmt.Sprintf("series %q cannot be normalized from interval %d to %d because it is empty", in.Target, in.Interval, interval))
}
// we need to copy the datapoints first because the consolidater will reuse the input slice
// also, for the consolidator's output to be canonical, the input must be pre-canonical.
// so add nulls in front and at the back to make it pre-canonical.
// this may make points in front and at the back less accurate when consolidated (e.g. summing when some of the points are null results in a lower value)
// but this is what graphite does....
datapoints := pointSlicePool.Get().([]schema.Point)
datapoints = makePreCanonicalCopy(in, interval, datapoints)
// series may have been created by a function that didn't know which consolidation function to default to.
// in the future maybe we can do more clever things here. e.g. perSecond maybe consolidate by max.
if in.Consolidator == 0 {
in.Consolidator = consolidation.Avg
}
in.Datapoints = consolidation.Consolidate(datapoints, interval/in.Interval, in.Consolidator)
in.Interval = interval
dataMap.Add(Req{}, in)
return in
}
// makePreCanonicalCopy returns a copy of in's datapoints slice, but adjusted to be pre-canonical with respect to interval.
// for this, it reuses the 'datapoints' slice.
func makePreCanonicalCopy(in models.Series, interval uint32, datapoints []schema.Point) []schema.Point {
// to achieve this we need to assure our input starts and ends with the right timestamp.
// we need to figure out what is the ts of the first point to feed into the consolidator
// example of how this works:
// if in.Interval is 5, and interval is 15, then for example, to generate point 15, because we postmark and we want a full input going into this point,
// you want inputs 5, 10 and 15.
// or more generally (you can follow any example vertically):
// 5 10 15 20 25 30 35 40 45 50 <-- if any of these timestamps are your first point in `in`
// 5 5 5 20 20 20 35 35 35 50 <-- then these are the corresponding timestamps of the first values we want as input for the consolidator
// 15 15 15 30 30 30 45 45 45 60 <-- which, when fed through alignForwardIfNotAligned(), result in these numbers
// 5 5 5 20 20 20 35 35 35 50 <-- subtract (aggnum-1)* in.interval or equivalent -interval + in.Interval = -15 + 5 = -10. this is our initial timestamp.
canonicalStart := align.ForwardIfNotAligned(in.Datapoints[0].Ts, interval) - interval + in.Interval
for ts := canonicalStart; ts < in.Datapoints[0].Ts; ts += in.Interval {
datapoints = append(datapoints, schema.Point{Val: math.NaN(), Ts: ts})
}
datapoints = append(datapoints, in.Datapoints...)
// for the desired last input ts, it's important to be aware of cases like this:
// until=47, interval=10, in.Interval = 5
// a canonical 10s series would have as last point 40. whereas our input series will have 45, which will consolidate into a point with timestamp 50, which is incorrect
// (it breaches `to`, and may have more points than other series it needs to be combined with)
// thus, we also need to potentially trim points from the back until the last point has the same Ts as a canonical series would
canonicalTs := (datapoints[len(datapoints)-1].Ts / interval) * interval
numDrop := int((datapoints[len(datapoints)-1].Ts - canonicalTs) / in.Interval)
return datapoints[0 : len(datapoints)-numDrop]
}