Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reducing the memory footprint of active series custom trackers #2568

Merged
merged 16 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type seriesStripe struct {
// seriesEntry holds a timestamp for single series.
type seriesEntry struct {
lbs labels.Labels
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
matches []bool // Which matchers of Matchers does this series match
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
matches PreAllocDynamicSlice // Index of the matcher matching
}

func NewActiveSeries(asm *Matchers, timeout time.Duration) *ActiveSeries {
Expand Down Expand Up @@ -191,12 +191,11 @@ func (s *seriesStripe) findOrCreateEntryForSeries(fingerprint uint64, series lab
}

matches := s.matchers.Matches(series)
matchesLen := matches.len()

s.active++
for i, ok := range matches {
if ok {
s.activeMatching[i]++
}
for i := 0; i < matchesLen; i++ {
s.activeMatching[matches.get(i)]++
}

e := seriesEntry{
Expand Down Expand Up @@ -260,10 +259,9 @@ func (s *seriesStripe) purge(keepUntil time.Time) {
}

s.active++
for i, ok := range entries[0].matches {
if ok {
s.activeMatching[i]++
}
ml := entries[0].matches.len()
for i := 0; i < ml; i++ {
s.activeMatching[entries[0].matches.get(i)]++
}
if ts < oldest {
oldest = ts
Expand Down Expand Up @@ -292,10 +290,9 @@ func (s *seriesStripe) purge(keepUntil time.Time) {
} else {
s.active += cnt
for i := range entries {
for i, ok := range entries[i].matches {
if ok {
s.activeMatching[i]++
}
ml := entries[i].matches.len()
for i := 0; i < ml; i++ {
s.activeMatching[entries[i].matches.get(i)]++
}
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/ingester/activeseries/custom_trackers_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"gopkg.in/yaml.v3"
)

const maxNumberOfTrackers = 64000
gubjanos marked this conversation as resolved.
Show resolved Hide resolved

// CustomTrackersConfig configures active series custom trackers.
// It can be set using a flag, or parsed from yaml.
type CustomTrackersConfig struct {
Expand Down Expand Up @@ -142,6 +144,9 @@ func (c CustomTrackersConfig) MarshalYAML() (interface{}, error) {
func NewCustomTrackersConfig(m map[string]string) (c CustomTrackersConfig, err error) {
c.source = m
c.config = map[string]labelsMatchers{}
if len(m) > maxNumberOfTrackers {
return c, fmt.Errorf("the number of trackers set [%d] exceeds the maximum number of trackers [%d]", len(m), maxNumberOfTrackers)
}
for name, matcher := range m {
sm, err := amlabels.ParseMatchers(matcher)
if err != nil {
Expand Down
32 changes: 31 additions & 1 deletion pkg/ingester/activeseries/custom_trackers_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
package activeseries

import (
"bytes"
"flag"
"fmt"
"testing"

"github.com/pkg/errors"
Expand Down Expand Up @@ -76,7 +78,7 @@ func TestCustomTrackersConfigs(t *testing.T) {
},
{
name: "whitespaces are trimmed from name and matcher",
flags: []string{`-ingester.active-series-custom-trackers= foo : {foo="bar"}` + "\n "},
flags: []string{`-ingester.active-series-custom-trackers= foo : {foo="bar"}` + "\n "},
expected: mustNewCustomTrackersConfigFromMap(t, map[string]string{`foo`: `{foo="bar"}`}),
},
{
Expand Down Expand Up @@ -125,6 +127,34 @@ func TestCustomTrackersConfigs(t *testing.T) {
}
}

func TestMaximumNumberOfTrackers(t *testing.T) {
t.Run("Flag based setup", func(t *testing.T) {
var flagToSet bytes.Buffer
numberOfTrackers := maxNumberOfTrackers + 1
for i := 0; i < numberOfTrackers; i++ {
flagToSet.WriteString(fmt.Sprintf("name%d:{__name__=%d}", i, i))
if i < numberOfTrackers-1 {
flagToSet.WriteString(";")
}
}

c := CustomTrackersConfig{}
err := c.Set(flagToSet.String())
require.Error(t, err, "custom tracker config should not accept more than %d trackers", maxNumberOfTrackers)
})

t.Run("Map based setup", func(t *testing.T) {
configMap := map[string]string{}
numberOfTrackers := maxNumberOfTrackers + 1
for i := 0; i < numberOfTrackers; i++ {
configMap[fmt.Sprintf("name%d", i)] = fmt.Sprintf("{__name__=%d}", i)
}

_, err := NewCustomTrackersConfig(configMap)
require.Error(t, err, "custom tracker config should not accept more than %d trackers", maxNumberOfTrackers)
})
}

func TestCustomTrackerConfig_Equality(t *testing.T) {
configSets := [][]CustomTrackersConfig{
{
Expand Down
39 changes: 35 additions & 4 deletions pkg/ingester/activeseries/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ func (m *Matchers) Config() CustomTrackersConfig {
return m.cfg
}

func (m *Matchers) Matches(series labels.Labels) []bool {
// Matches returns a PreAllocDynamicSlice containing only matcher indexes which are matching
func (m *Matchers) Matches(series labels.Labels) PreAllocDynamicSlice {
if len(m.matchers) == 0 {
return nil
return PreAllocDynamicSlice{}
}
matches := make([]bool, len(m.matchers))
var matches PreAllocDynamicSlice
for i, sm := range m.matchers {
matches[i] = sm.Matches(series)
if sm.Matches(series) {
matches.append(i)
}
}
return matches
}
Expand Down Expand Up @@ -77,3 +80,31 @@ func amlabelMatcherToProm(m *amlabels.Matcher) *labels.Matcher {
// labels.MatchType(m.Type) is a risky conversion because it depends on the iota order, but we have a test for it
return labels.MustNewMatcher(labels.MatchType(m.Type), m.Name, m.Value)
}

const preAllocatedSize = 3

type PreAllocDynamicSlice struct {
gubjanos marked this conversation as resolved.
Show resolved Hide resolved
gubjanos marked this conversation as resolved.
Show resolved Hide resolved
arr [preAllocatedSize]uint16
arrl byte
rest []uint16
}

func (fs *PreAllocDynamicSlice) append(val int) {
if fs.arrl < preAllocatedSize {
fs.arr[fs.arrl] = uint16(val)
fs.arrl++
return
}
fs.rest = append(fs.rest, uint16(val))
}

func (fs *PreAllocDynamicSlice) get(idx int) uint16 {
if idx < preAllocatedSize {
return fs.arr[idx]
}
return fs.rest[idx-preAllocatedSize]
}

func (fs *PreAllocDynamicSlice) len() int {
return int(fs.arrl) + len(fs.rest)
}
101 changes: 69 additions & 32 deletions pkg/ingester/activeseries/matchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package activeseries

import (
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -22,70 +24,105 @@ func TestMatcher_MatchesSeries(t *testing.T) {

for _, tc := range []struct {
series labels.Labels
expected []bool
expected []int
}{
{
series: labels.Labels{{Name: "foo", Value: "true"}, {Name: "baz", Value: "unrelated"}},
expected: []bool{
false, // bar_starts_with_1
false, // does_not_have_foo_label
false, // has_foo_and_bar_starts_with_1
true, // has_foo_label
expected: []int{
3, // has_foo_label
},
},
{
series: labels.Labels{{Name: "foo", Value: "true"}, {Name: "bar", Value: "100"}, {Name: "baz", Value: "unrelated"}},
expected: []bool{
true, // bar_starts_with_1
false, // does_not_have_foo_label
true, // has_foo_and_bar_starts_with_1
true, // has_foo_label
expected: []int{
0, // bar_starts_with_1
2, // has_foo_and_bar_starts_with_1
3, // has_foo_label
},
},
{
series: labels.Labels{{Name: "foo", Value: "true"}, {Name: "bar", Value: "200"}, {Name: "baz", Value: "unrelated"}},
expected: []bool{
false, // bar_starts_with_1
false, // does_not_have_foo_label
false, // has_foo_and_bar_starts_with_1
true, // has_foo_label
expected: []int{
3, // has_foo_label
},
},
{
series: labels.Labels{{Name: "bar", Value: "200"}, {Name: "baz", Value: "unrelated"}},
expected: []bool{
false, // bar_starts_with_1
true, // does_not_have_foo_label
false, // has_foo_and_bar_starts_with_1
false, // has_foo_label
expected: []int{
1, // does_not_have_foo_label
},
},
{
series: labels.Labels{{Name: "bar", Value: "100"}, {Name: "baz", Value: "unrelated"}},
expected: []bool{
true, // bar_starts_with_1
true, // does_not_have_foo_label
false, // has_foo_and_bar_starts_with_1
false, // has_foo_label
expected: []int{
0, // bar_starts_with_1
1, // does_not_have_foo_label
},
},
{
series: labels.Labels{{Name: "baz", Value: "unrelated"}},
expected: []bool{
false, // bar_starts_with_1
true, // does_not_have_foo_label
false, // has_foo_and_bar_starts_with_1
false, // has_foo_label
expected: []int{
1, // does_not_have_foo_label
},
},
} {
t.Run(tc.series.String(), func(t *testing.T) {
got := asm.Matches(tc.series)
assert.Equal(t, tc.expected, got)
assert.Equal(t, tc.expected, preAllocDynamicSliceToSlice(got))
})
}
}

func preAllocDynamicSliceToSlice(prealloc PreAllocDynamicSlice) []int {
slice := make([]int, prealloc.len())
for i := 0; i < prealloc.len(); i++ {
slice[i] = int(prealloc.get(i))
}
return slice
}

func BenchmarkMatchesSeries(b *testing.B) {

trackerCounts := []int{10, 100, 1000, 10000}
asms := make([]*Matchers, len(trackerCounts))

for i, matcherCount := range trackerCounts {
configMap := map[string]string{}
for j := 0; j < matcherCount; j++ {
configMap[strconv.Itoa(j)] = fmt.Sprintf("{grafanacloud_usage_group=~%d.*}", j)
}
config, _ := NewCustomTrackersConfig(configMap)
gubjanos marked this conversation as resolved.
Show resolved Hide resolved
asms[i] = NewMatchers(config)

}

labelCounts := []int{1, 10, 100}
series := make([]labels.Labels, len(labelCounts))
for i, labelCount := range labelCounts {
l := labels.Labels{
{Name: "grafanacloud_usage_group", Value: "1"}, // going to match exactly to one matcher
}
for j := 1; j < labelCount; j++ {
labelEntry := labels.Label{Name: fmt.Sprintf("foo%d", j), Value: "true"}
l = append(l, labelEntry)
}
series[i] = l
}

for i, trackerCount := range trackerCounts {
for j, labelCount := range labelCounts {
b.Run(fmt.Sprintf("TrackerCount: %d, LabelCount: %d", trackerCount, labelCount), func(b *testing.B) {
for x := 0; x < b.N; x++ {
got := asms[i].Matches(series[j])
if got.len() > 2 {
b.FailNow()
}
}
})
}
}
}

func TestCustomTrackersConfigs_MalformedMatcher(t *testing.T) {
for _, matcher := range []string{
`{foo}`,
Expand Down