Skip to content

Commit

Permalink
pkg/receive: add simple hashring
Browse files Browse the repository at this point in the history
  • Loading branch information
squat committed May 30, 2019
1 parent 981561a commit ae86f2c
Show file tree
Hide file tree
Showing 2 changed files with 363 additions and 0 deletions.
114 changes: 114 additions & 0 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package receive

import (
"errors"
"sort"

"github.com/improbable-eng/thanos/pkg/store/prompb"

"github.com/cespare/xxhash"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup"
)

const sep = '\xff'

// Hashring finds the correct host to handle a given time series
// for a specified tenant.
// It returns the hostname and any error encountered.
type Hashring interface {
GetHost(tenant string, timeSeries *prompb.TimeSeries) (string, error)
}

// Matcher determines whether or tenant matches a hashring.
type Matcher interface {
Match(tenant string, hashring string) bool
}

// MultiMatcher is a list of Matchers that implements the Matcher interface.
type MultiMatcher []Matcher

// Match implements the Matcher interface.
func (m MultiMatcher) Match(tenant, hashring string) bool {
for i := range m {
if !m[i].Match(tenant, hashring) {
return false
}
}
return true
}

// MatcherFunc is a shim to use a func as a Matcher.
type MatcherFunc func(string, string) bool

// Match implements the Matcher interface.
func (m MatcherFunc) Match(tenant, hashring string) bool {
return m(tenant, hashring)
}

// ExactMatcher is a matcher that checks if the tenant exactly matches the hashring name.
var ExactMatcher = MatcherFunc(func(tenant, hashring string) bool { return tenant == hashring })

// hash returns a hash for the given tenant and time series.
func hash(tenant string, ts *prompb.TimeSeries) uint64 {
// Sort labelset to ensure a stable hash.
sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name })

b := make([]byte, 0, 1024)
b = append(b, []byte(tenant)...)
b = append(b, sep)
for _, v := range ts.Labels {
b = append(b, v.Name...)
b = append(b, sep)
b = append(b, v.Value...)
b = append(b, sep)
}
return xxhash.Sum64(b)
}

// simpleHashring represents a group of hosts handling write requests.
type simpleHashring struct {
targetgroup.Group
}

// GetHost returns a hostname to handle the given tenant and time series.
func (s *simpleHashring) GetHost(tenant string, ts *prompb.TimeSeries) (string, error) {
// Always return nil here to implement the Hashring interface.
return string(s.Targets[hash(tenant, ts)%uint64(len(s.Targets))][model.AddressLabel]), nil
}

// matchingHashring represents a set of hashrings.
// Which hashring to use is determined by the matcher.
type matchingHashring struct {
cache map[string]Hashring
hashrings map[string]Hashring
matcher Matcher
}

// GetHost returns a hostname to handle the given tenant and time series.
func (m matchingHashring) GetHost(tenant string, ts *prompb.TimeSeries) (string, error) {
if h, ok := m.cache[tenant]; ok {
return h.GetHost(tenant, ts)
}
for name := range m.hashrings {
if m.matcher.Match(tenant, name) {
m.cache[tenant] = m.hashrings[name]
return m.hashrings[name].GetHost(tenant, ts)
}
}
return "", errors.New("no matching hosts to handle tenant")
}

// NewHashring creates a multi-tenant hashring for a given slice of
// groups. Which tenant's hashring to use is determined by the Matcher.
func NewHashring(matcher Matcher, groups []*targetgroup.Group) Hashring {
m := matchingHashring{
cache: make(map[string]Hashring),
hashrings: make(map[string]Hashring),
matcher: matcher,
}
for _, g := range groups {
m.hashrings[g.Source] = &simpleHashring{*g}
}
return m
}
249 changes: 249 additions & 0 deletions pkg/receive/hashring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package receive

import (
"testing"

"github.com/improbable-eng/thanos/pkg/store/prompb"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup"
)

func TestHash(t *testing.T) {
ts := &prompb.TimeSeries{
Labels: []prompb.Label{
{
Name: "foo",
Value: "bar",
},
{
Name: "baz",
Value: "qux",
},
},
}

ts2 := &prompb.TimeSeries{
Labels: []prompb.Label{ts.Labels[1], ts.Labels[0]},
}

if hash("", ts) != hash("", ts2) {
t.Errorf("expected hashes to be independent of label order")
}
}

func TestGetHost(t *testing.T) {
ts := &prompb.TimeSeries{
Labels: []prompb.Label{
{
Name: "foo",
Value: "bar",
},
{
Name: "baz",
Value: "qux",
},
},
}

for _, tc := range []struct {
name string
cfg []*targetgroup.Group
hosts map[string]struct{}
tenant string
}{
{
name: "empty",
cfg: []*targetgroup.Group{},
tenant: "tenant1",
},
{
name: "simple",
cfg: []*targetgroup.Group{
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host1",
},
},
},
},
hosts: map[string]struct{}{"host1": struct{}{}},
},
{
name: "specific",
cfg: []*targetgroup.Group{
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host1",
},
},
Source: "",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host2",
},
},
Source: "tenant1",
},
},
hosts: map[string]struct{}{"host2": struct{}{}},
tenant: "tenant1",
},
{
name: "many tenants",
cfg: []*targetgroup.Group{
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host1",
},
},
Source: "tenant1",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host2",
},
},
Source: "tenant2",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host3",
},
},
Source: "tenant3",
},
},
hosts: map[string]struct{}{"host1": struct{}{}},
tenant: "tenant1",
},
{
name: "many tenants error",
cfg: []*targetgroup.Group{
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host1",
},
},
Source: "tenant1",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host2",
},
},
Source: "tenant2",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host3",
},
},
Source: "tenant3",
},
},
tenant: "tenant4",
},
{
name: "many hosts",
cfg: []*targetgroup.Group{
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host1",
},
model.LabelSet{
model.AddressLabel: "host2",
},
model.LabelSet{
model.AddressLabel: "host3",
},
},
Source: "tenant1",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host4",
},
model.LabelSet{
model.AddressLabel: "host5",
},
model.LabelSet{
model.AddressLabel: "host6",
},
},
Source: "",
},
},
hosts: map[string]struct{}{
"host1": struct{}{},
"host2": struct{}{},
"host3": struct{}{},
},
tenant: "tenant1",
},
{
name: "many hosts 2",
cfg: []*targetgroup.Group{
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host1",
},
model.LabelSet{
model.AddressLabel: "host2",
},
model.LabelSet{
model.AddressLabel: "host3",
},
},
Source: "tenant1",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host4",
},
model.LabelSet{
model.AddressLabel: "host5",
},
model.LabelSet{
model.AddressLabel: "host6",
},
},
},
},
hosts: map[string]struct{}{
"host4": struct{}{},
"host5": struct{}{},
"host6": struct{}{},
},
},
} {
hs := NewHashring(ExactMatcher, tc.cfg)
h, err := hs.GetHost(tc.tenant, ts)
if tc.hosts != nil {
if err != nil {
t.Errorf("case %q: got unexpected error: %v", tc.name, err)
continue
}
if _, ok := tc.hosts[h]; !ok {
t.Errorf("case %q: got unexpected host %q", tc.name, h)
}
continue
}
if err == nil {
t.Errorf("case %q: expected error", tc.name)
}
}
}

0 comments on commit ae86f2c

Please sign in to comment.