diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go new file mode 100644 index 0000000000..1ca1c187ea --- /dev/null +++ b/pkg/receive/hashring.go @@ -0,0 +1,114 @@ +package receive + +import ( + "errors" + "sort" + + "github.com/improbable-eng/thanos/pkg/store/prompb" + + "github.com/cespare/xxhash" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +const sep = '\xff' + +// Hashring finds the correct host to handle a given time series +// for a specified tenant. +// It returns the hostname and any error encountered. +type Hashring interface { + GetHost(tenant string, timeSeries *prompb.TimeSeries) (string, error) +} + +// Matcher determines whether or tenant matches a hashring. +type Matcher interface { + Match(tenant string, hashring string) bool +} + +// MultiMatcher is a list of Matchers that implements the Matcher interface. +type MultiMatcher []Matcher + +// Match implements the Matcher interface. +func (m MultiMatcher) Match(tenant, hashring string) bool { + for i := range m { + if !m[i].Match(tenant, hashring) { + return false + } + } + return true +} + +// MatcherFunc is a shim to use a func as a Matcher. +type MatcherFunc func(string, string) bool + +// Match implements the Matcher interface. +func (m MatcherFunc) Match(tenant, hashring string) bool { + return m(tenant, hashring) +} + +// ExactMatcher is a matcher that checks if the tenant exactly matches the hashring name. +var ExactMatcher = MatcherFunc(func(tenant, hashring string) bool { return tenant == hashring }) + +// hash returns a hash for the given tenant and time series. +func hash(tenant string, ts *prompb.TimeSeries) uint64 { + // Sort labelset to ensure a stable hash. + sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name }) + + b := make([]byte, 0, 1024) + b = append(b, []byte(tenant)...) + b = append(b, sep) + for _, v := range ts.Labels { + b = append(b, v.Name...) + b = append(b, sep) + b = append(b, v.Value...) + b = append(b, sep) + } + return xxhash.Sum64(b) +} + +// simpleHashring represents a group of hosts handling write requests. +type simpleHashring struct { + targetgroup.Group +} + +// GetHost returns a hostname to handle the given tenant and time series. +func (s *simpleHashring) GetHost(tenant string, ts *prompb.TimeSeries) (string, error) { + // Always return nil here to implement the Hashring interface. + return string(s.Targets[hash(tenant, ts)%uint64(len(s.Targets))][model.AddressLabel]), nil +} + +// matchingHashring represents a set of hashrings. +// Which hashring to use is determined by the matcher. +type matchingHashring struct { + cache map[string]Hashring + hashrings map[string]Hashring + matcher Matcher +} + +// GetHost returns a hostname to handle the given tenant and time series. +func (m matchingHashring) GetHost(tenant string, ts *prompb.TimeSeries) (string, error) { + if h, ok := m.cache[tenant]; ok { + return h.GetHost(tenant, ts) + } + for name := range m.hashrings { + if m.matcher.Match(tenant, name) { + m.cache[tenant] = m.hashrings[name] + return m.hashrings[name].GetHost(tenant, ts) + } + } + return "", errors.New("no matching hosts to handle tenant") +} + +// NewHashring creates a multi-tenant hashring for a given slice of +// groups. Which tenant's hashring to use is determined by the Matcher. +func NewHashring(matcher Matcher, groups []*targetgroup.Group) Hashring { + m := matchingHashring{ + cache: make(map[string]Hashring), + hashrings: make(map[string]Hashring), + matcher: matcher, + } + for _, g := range groups { + m.hashrings[g.Source] = &simpleHashring{*g} + } + return m +} diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go new file mode 100644 index 0000000000..a3f2fad1a6 --- /dev/null +++ b/pkg/receive/hashring_test.go @@ -0,0 +1,249 @@ +package receive + +import ( + "testing" + + "github.com/improbable-eng/thanos/pkg/store/prompb" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +func TestHash(t *testing.T) { + ts := &prompb.TimeSeries{ + Labels: []prompb.Label{ + { + Name: "foo", + Value: "bar", + }, + { + Name: "baz", + Value: "qux", + }, + }, + } + + ts2 := &prompb.TimeSeries{ + Labels: []prompb.Label{ts.Labels[1], ts.Labels[0]}, + } + + if hash("", ts) != hash("", ts2) { + t.Errorf("expected hashes to be independent of label order") + } +} + +func TestGetHost(t *testing.T) { + ts := &prompb.TimeSeries{ + Labels: []prompb.Label{ + { + Name: "foo", + Value: "bar", + }, + { + Name: "baz", + Value: "qux", + }, + }, + } + + for _, tc := range []struct { + name string + cfg []*targetgroup.Group + hosts map[string]struct{} + tenant string + }{ + { + name: "empty", + cfg: []*targetgroup.Group{}, + tenant: "tenant1", + }, + { + name: "simple", + cfg: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host1", + }, + }, + }, + }, + hosts: map[string]struct{}{"host1": struct{}{}}, + }, + { + name: "specific", + cfg: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host1", + }, + }, + Source: "", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host2", + }, + }, + Source: "tenant1", + }, + }, + hosts: map[string]struct{}{"host2": struct{}{}}, + tenant: "tenant1", + }, + { + name: "many tenants", + cfg: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host1", + }, + }, + Source: "tenant1", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host2", + }, + }, + Source: "tenant2", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host3", + }, + }, + Source: "tenant3", + }, + }, + hosts: map[string]struct{}{"host1": struct{}{}}, + tenant: "tenant1", + }, + { + name: "many tenants error", + cfg: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host1", + }, + }, + Source: "tenant1", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host2", + }, + }, + Source: "tenant2", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host3", + }, + }, + Source: "tenant3", + }, + }, + tenant: "tenant4", + }, + { + name: "many hosts", + cfg: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host1", + }, + model.LabelSet{ + model.AddressLabel: "host2", + }, + model.LabelSet{ + model.AddressLabel: "host3", + }, + }, + Source: "tenant1", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host4", + }, + model.LabelSet{ + model.AddressLabel: "host5", + }, + model.LabelSet{ + model.AddressLabel: "host6", + }, + }, + Source: "", + }, + }, + hosts: map[string]struct{}{ + "host1": struct{}{}, + "host2": struct{}{}, + "host3": struct{}{}, + }, + tenant: "tenant1", + }, + { + name: "many hosts 2", + cfg: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host1", + }, + model.LabelSet{ + model.AddressLabel: "host2", + }, + model.LabelSet{ + model.AddressLabel: "host3", + }, + }, + Source: "tenant1", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host4", + }, + model.LabelSet{ + model.AddressLabel: "host5", + }, + model.LabelSet{ + model.AddressLabel: "host6", + }, + }, + }, + }, + hosts: map[string]struct{}{ + "host4": struct{}{}, + "host5": struct{}{}, + "host6": struct{}{}, + }, + }, + } { + hs := NewHashring(ExactMatcher, tc.cfg) + h, err := hs.GetHost(tc.tenant, ts) + if tc.hosts != nil { + if err != nil { + t.Errorf("case %q: got unexpected error: %v", tc.name, err) + continue + } + if _, ok := tc.hosts[h]; !ok { + t.Errorf("case %q: got unexpected host %q", tc.name, h) + } + continue + } + if err == nil { + t.Errorf("case %q: expected error", tc.name) + } + } +}