Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'bloomberg/removeAboveBelowValue'
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Aug 13, 2018
2 parents 0d56ef4 + b3ba612 commit 34febb0
Show file tree
Hide file tree
Showing 4 changed files with 347 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/graphite.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Here are the currently included functions:
| movingAverage(seriesLists, windowSize) seriesList | | Unstable |
| perSecond(seriesLists) seriesList | | Stable |
| rangeOfSeries(seriesList) series | | Stable |
| removeAboveValue(seriesList, n) seriesList | | Stable |
| removeBelowValue(seriesList, n) seriesList | | Stable |
| scale(seriesList, num) series | | Stable |
| scaleToSeconds(seriesList, seconds) series | | Stable |
| stddevSeries(seriesList) series | | Stable |
Expand Down
71 changes: 71 additions & 0 deletions expr/func_removeabovebelowvalue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package expr

import (
"fmt"
"math"

schema "gopkg.in/raintank/schema.v1"

"github.com/grafana/metrictank/api/models"
)

type FuncRemoveAboveBelowValue struct {
in GraphiteFunc
n float64
above bool
}

func NewRemoveAboveBelowValueConstructor(above bool) func() GraphiteFunc {
return func() GraphiteFunc {
return &FuncRemoveAboveBelowValue{above: above}
}
}

func (s *FuncRemoveAboveBelowValue) Signature() ([]Arg, []Arg) {

return []Arg{
ArgSeriesList{val: &s.in},
ArgFloat{key: "n", val: &s.n},
}, []Arg{ArgSeriesList{}}
}

func (s *FuncRemoveAboveBelowValue) Context(context Context) Context {
return context
}

func (s *FuncRemoveAboveBelowValue) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
series, err := s.in.Exec(cache)
if err != nil {
return nil, err
}

var output []models.Series
for _, serie := range series {
if s.above {
serie.Target = fmt.Sprintf("removeAboveValue(%s, %g)", serie.Target, s.n)
} else {
serie.Target = fmt.Sprintf("removeBelowValue(%s, %g)", serie.Target, s.n)
}
serie.QueryPatt = serie.Target

out := pointSlicePool.Get().([]schema.Point)
for _, p := range serie.Datapoints {
if s.above {
if p.Val > s.n {
p.Val = math.NaN()
}
} else {
if p.Val < s.n {
p.Val = math.NaN()
}
}
out = append(out, p)
}
serie.Datapoints = out
output = append(output, serie)
}

cache[Req{}] = append(cache[Req{}], output...)

return output, nil
}
272 changes: 272 additions & 0 deletions expr/func_removeabovebelowvalue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
package expr

import (
"math"
"math/rand"
"strconv"
"testing"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/test"
"gopkg.in/raintank/schema.v1"
)

func TestRemoveAboveValueSingleAllNonNull(t *testing.T) {
testRemoveAboveBelowValue(
"removeAboveValue",
true,
199,
[]models.Series{
{
Interval: 10,
QueryPatt: "abcd",
Target: "a",
Datapoints: getCopy(a),
},
{
Interval: 10,
QueryPatt: "abcd",
Target: "b",
Datapoints: getCopy(b),
},
{
Interval: 10,
QueryPatt: "abcd",
Target: "c",
Datapoints: getCopy(c),
},
{
Interval: 10,
QueryPatt: "abcd",
Target: "d",
Datapoints: getCopy(d),
},
},
[]models.Series{
{
Interval: 10,
QueryPatt: "removeAboveValue(a, 199)",
Datapoints: []schema.Point{
{Val: 0, Ts: 10},
{Val: 0, Ts: 20},
{Val: 5.5, Ts: 30},
{Val: math.NaN(), Ts: 40},
{Val: math.NaN(), Ts: 50},
{Val: math.NaN(), Ts: 60},
},
},
{
Interval: 10,
QueryPatt: "removeAboveValue(b, 199)",
Datapoints: []schema.Point{
{Val: 0, Ts: 10},
{Val: math.NaN(), Ts: 20},
{Val: math.NaN(), Ts: 30},
{Val: math.NaN(), Ts: 40},
{Val: math.NaN(), Ts: 50},
{Val: math.NaN(), Ts: 60},
},
},
{
Interval: 10,
QueryPatt: "removeAboveValue(c, 199)",
Datapoints: []schema.Point{
{Val: 0, Ts: 10},
{Val: 0, Ts: 20},
{Val: 1, Ts: 30},
{Val: 2, Ts: 40},
{Val: 3, Ts: 50},
{Val: 4, Ts: 60},
},
},
{
Interval: 10,
QueryPatt: "removeAboveValue(d, 199)",
Datapoints: []schema.Point{
{Val: 0, Ts: 10},
{Val: 33, Ts: 20},
{Val: 199, Ts: 30},
{Val: 29, Ts: 40},
{Val: 80, Ts: 50},
{Val: math.NaN(), Ts: 60},
},
},
},
t,
)
}

func TestRemoveBelowValueSingleAllNonNull(t *testing.T) {
testRemoveAboveBelowValue(
"removeBelowValue",
false,
199,
[]models.Series{
{
Interval: 10,
QueryPatt: "abcd",
Target: "a",
Datapoints: getCopy(a),
},
{
Interval: 10,
QueryPatt: "abcd",
Target: "b",
Datapoints: getCopy(b),
},
{
Interval: 10,
QueryPatt: "abcd",
Target: "c",
Datapoints: getCopy(c),
},
{
Interval: 10,
QueryPatt: "abcd",
Target: "d",
Datapoints: getCopy(d),
},
},
[]models.Series{
{
Interval: 10,
QueryPatt: "removeBelowValue(a, 199)",
Datapoints: []schema.Point{
{Val: math.NaN(), Ts: 10},
{Val: math.NaN(), Ts: 20},
{Val: math.NaN(), Ts: 30},
{Val: math.NaN(), Ts: 40},
{Val: math.NaN(), Ts: 50},
{Val: 1234567890, Ts: 60},
},
},
{
Interval: 10,
QueryPatt: "removeBelowValue(b, 199)",
Datapoints: []schema.Point{
{Val: math.NaN(), Ts: 10},
{Val: math.MaxFloat64, Ts: 20},
{Val: math.MaxFloat64 - 20, Ts: 30},
{Val: math.NaN(), Ts: 40},
{Val: 1234567890, Ts: 50},
{Val: math.NaN(), Ts: 60},
},
},
{
Interval: 10,
QueryPatt: "removeBelowValue(c, 199)",
Datapoints: []schema.Point{
{Val: math.NaN(), Ts: 10},
{Val: math.NaN(), Ts: 20},
{Val: math.NaN(), Ts: 30},
{Val: math.NaN(), Ts: 40},
{Val: math.NaN(), Ts: 50},
{Val: math.NaN(), Ts: 60},
},
},
{
Interval: 10,
QueryPatt: "removeBelowValue(d, 199)",
Datapoints: []schema.Point{
{Val: math.NaN(), Ts: 10},
{Val: math.NaN(), Ts: 20},
{Val: 199, Ts: 30},
{Val: math.NaN(), Ts: 40},
{Val: math.NaN(), Ts: 50},
{Val: 250, Ts: 60},
},
},
},
t,
)
}

func testRemoveAboveBelowValue(name string, above bool, n float64, in []models.Series, out []models.Series, t *testing.T) {
f := NewRemoveAboveBelowValueConstructor(above)()
f.(*FuncRemoveAboveBelowValue).in = NewMock(in)
f.(*FuncRemoveAboveBelowValue).n = n
gots, err := f.Exec(make(map[Req][]models.Series))
if err != nil {
t.Fatalf("case %q (%f): err should be nil. got %q", name, n, err)
}
if len(gots) != len(out) {
t.Fatalf("case %q (%f): isNonNull len output expected %d, got %d", name, n, len(out), len(gots))
}
for i, g := range gots {
exp := out[i]
if g.QueryPatt != exp.QueryPatt {
t.Fatalf("case %q (%f): expected target %q, got %q", name, n, exp.QueryPatt, g.QueryPatt)
}
if len(g.Datapoints) != len(exp.Datapoints) {
t.Fatalf("case %q (%f) len output expected %d, got %d", name, n, len(exp.Datapoints), len(g.Datapoints))
}
for j, p := range g.Datapoints {
bothNaN := math.IsNaN(p.Val) && math.IsNaN(exp.Datapoints[j].Val)
if (bothNaN || p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts {
continue
}
t.Fatalf("case %q (%f): output point %d - expected %v got %v", name, n, j, exp.Datapoints[j], p)
}
}
}
func BenchmarkRemoveAboveBelowValue10k_1NoNulls(b *testing.B) {
benchmarkRemoveAboveBelowValue(b, 1, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkRemoveAboveBelowValue10k_10NoNulls(b *testing.B) {
benchmarkRemoveAboveBelowValue(b, 10, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkRemoveAboveBelowValue10k_100NoNulls(b *testing.B) {
benchmarkRemoveAboveBelowValue(b, 100, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkRemoveAboveBelowValue10k_1000NoNulls(b *testing.B) {
benchmarkRemoveAboveBelowValue(b, 1000, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkRemoveAboveBelowValue10k_1SomeSeriesHalfNulls(b *testing.B) {
benchmarkRemoveAboveBelowValue(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkRemoveAboveBelowValue10k_10SomeSeriesHalfNulls(b *testing.B) {
benchmarkRemoveAboveBelowValue(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkRemoveAboveBelowValue10k_100SomeSeriesHalfNulls(b *testing.B) {
benchmarkRemoveAboveBelowValue(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkRemoveAboveBelowValue10k_1000SomeSeriesHalfNulls(b *testing.B) {
benchmarkRemoveAboveBelowValue(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkRemoveAboveBelowValue10k_1AllSeriesHalfNulls(b *testing.B) {
benchmarkRemoveAboveBelowValue(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkRemoveAboveBelowValue10k_10AllSeriesHalfNulls(b *testing.B) {
benchmarkRemoveAboveBelowValue(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkRemoveAboveBelowValue10k_100AllSeriesHalfNulls(b *testing.B) {
benchmarkRemoveAboveBelowValue(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkRemoveAboveBelowValue10k_1000AllSeriesHalfNulls(b *testing.B) {
benchmarkRemoveAboveBelowValue(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func benchmarkRemoveAboveBelowValue(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) {
var input []models.Series
for i := 0; i < numSeries; i++ {
series := models.Series{
QueryPatt: strconv.Itoa(i),
}
if i%2 == 0 {
series.Datapoints = fn0()
} else {
series.Datapoints = fn1()
}
input = append(input, series)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
f := NewRemoveAboveBelowValueConstructor(rand.Int()%2 == 0)()
f.(*FuncRemoveAboveBelowValue).in = NewMock(input)
f.(*FuncRemoveAboveBelowValue).n = rand.Float64()
got, err := f.Exec(make(map[Req][]models.Series))
if err != nil {
b.Fatalf("%s", err)
}
results = got
}
}
2 changes: 2 additions & 0 deletions expr/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func init() {
"movingAverage": {NewMovingAverage, false},
"perSecond": {NewPerSecond, true},
"rangeOfSeries": {NewAggregateConstructor("rangeOf", crossSeriesRange), true},
"removeAboveValue": {NewRemoveAboveBelowValueConstructor(true), true},
"removeBelowValue": {NewRemoveAboveBelowValueConstructor(false), true},
"scale": {NewScale, true},
"scaleToSeconds": {NewScaleToSeconds, true},
"smartSummarize": {NewSmartSummarize, false},
Expand Down

0 comments on commit 34febb0

Please sign in to comment.