Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: fail early if ketama hashring is configured with number of nodes lower than the replication factor #6168

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6171](https://github.com/thanos-io/thanos/pull/6171) Store: fix error handling on limits.

### Changed
- [#6168](https://github.com/thanos-io/thanos/pull/6168) Receiver: Make ketama hashring fail early when configured with number of nodes lower than the replication factor.

### Removed

Expand Down
24 changes: 18 additions & 6 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (f *fakeAppender) Rollback() error {
return f.rollbackErr()
}

func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64, hashringAlgo HashringAlgorithm) ([]*Handler, Hashring) {
func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64, hashringAlgo HashringAlgorithm) ([]*Handler, Hashring, error) {
var (
cfg = []HashringConfig{{Hashring: "test"}}
handlers []*Handler
Expand Down Expand Up @@ -202,11 +202,14 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
hashringAlgo = AlgorithmHashmod
}

hashring := newMultiHashring(hashringAlgo, replicationFactor, cfg)
hashring, err := newMultiHashring(hashringAlgo, replicationFactor, cfg)
if err != nil {
return nil, nil, err
}
for _, h := range handlers {
h.Hashring(hashring)
}
return handlers, hashring
return handlers, hashring, nil
}

func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsistencyDelay bool) {
Expand Down Expand Up @@ -576,7 +579,10 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist
},
} {
t.Run(tc.name, func(t *testing.T) {
handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor, hashringAlgo)
handlers, hashring, err := newTestHandlerHashring(tc.appendables, tc.replicationFactor, hashringAlgo)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually use testutil.Ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose that to be consistent with the other assertions in this test. Its probably best to update all of them. While i'm at it should i also convert them to testutil.Ok form?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good point, let's then leave it as it is for consistency.

t.Fatalf("unable to create test handler: %v", err)
}
tenant := "test"
// Test from the point of view of every node
// so that we know status code does not depend
Expand Down Expand Up @@ -706,7 +712,10 @@ func TestReceiveWriteRequestLimits(t *testing.T) {
appender: newFakeAppender(nil, nil, nil),
},
}
handlers, _ := newTestHandlerHashring(appendables, 3, AlgorithmHashmod)
handlers, _, err := newTestHandlerHashring(appendables, 3, AlgorithmHashmod)
if err != nil {
t.Fatalf("unable to create test handler: %v", err)
}
handler := handlers[0]

tenant := "test"
Expand Down Expand Up @@ -915,7 +924,10 @@ func makeSeriesWithValues(numSeries int) []prompb.TimeSeries {
func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
dir := b.TempDir()

handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1, AlgorithmHashmod)
handlers, _, err := newTestHandlerHashring([]*fakeAppendable{nil}, 1, AlgorithmHashmod)
if err != nil {
b.Fatalf("unable to create test handler: %v", err)
}
handler := handlers[0]

reg := prometheus.NewRegistry()
Expand Down
35 changes: 24 additions & 11 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,14 @@ type ketamaHashring struct {
numEndpoints uint64
}

func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFactor uint64) *ketamaHashring {
func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) {
numSections := len(endpoints) * sectionsPerNode

if len(endpoints) < int(replicationFactor) {
return nil, errors.New("ketama: amount of endpoints needs to be larger than replication factor")

}

hash := xxhash.New()
ringSections := make(sections, 0, numSections)
for endpointIndex, endpoint := range endpoints {
Expand All @@ -135,7 +140,7 @@ func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFacto
endpoints: endpoints,
sections: ringSections,
numEndpoints: uint64(len(endpoints)),
}
}, nil
}

// calculateSectionReplicas pre-calculates replicas for each section,
Expand Down Expand Up @@ -234,17 +239,21 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
// groups.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) Hashring {
func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) (Hashring, error) {
m := &multiHashring{
cache: make(map[string]Hashring),
}

for _, h := range cfg {
var hashring Hashring
var err error
if h.Algorithm != "" {
hashring = newHashring(h.Algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
hashring, err = newHashring(h.Algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
} else {
hashring = newHashring(algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
hashring, err = newHashring(algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
}
if err != nil {
return nil, err
}
m.hashrings = append(m.hashrings, hashring)
var t map[string]struct{}
Expand All @@ -256,7 +265,7 @@ func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
}
m.tenantSets = append(m.tenantSets, t)
}
return m
return m, nil
}

// HashringFromConfigWatcher creates multi-tenant hashrings from a
Expand All @@ -276,7 +285,11 @@ func HashringFromConfigWatcher(ctx context.Context, algorithm HashringAlgorithm,
if !ok {
return errors.New("hashring config watcher stopped unexpectedly")
}
updates <- newMultiHashring(algorithm, replicationFactor, cfg)
h, err := newMultiHashring(algorithm, replicationFactor, cfg)
if err != nil {
return errors.Wrap(err, "unable to create new hashring from config")
}
updates <- h
case <-ctx.Done():
return ctx.Err()
}
Expand All @@ -295,20 +308,20 @@ func HashringFromConfig(algorithm HashringAlgorithm, replicationFactor uint64, c
return nil, errors.Wrapf(err, "failed to load configuration")
}

return newMultiHashring(algorithm, replicationFactor, config), err
return newMultiHashring(algorithm, replicationFactor, config)
}

func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFactor uint64, hashring string, tenants []string) Hashring {
func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) {
switch algorithm {
case AlgorithmHashmod:
return simpleHashring(endpoints)
return simpleHashring(endpoints), nil
case AlgorithmKetama:
return newKetamaHashring(endpoints, SectionsPerNode, replicationFactor)
default:
l := log.NewNopLogger()
level.Warn(l).Log("msg", "Unrecognizable hashring algorithm. Fall back to hashmod algorithm.",
"hashring", hashring,
"tenants", tenants)
return simpleHashring(endpoints)
return simpleHashring(endpoints), nil
}
}
17 changes: 14 additions & 3 deletions pkg/receive/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ func TestHashringGet(t *testing.T) {
},
},
} {
hs := newMultiHashring(AlgorithmHashmod, 3, tc.cfg)
hs, err := newMultiHashring(AlgorithmHashmod, 3, tc.cfg)
require.NoError(t, err)

h, err := hs.Get(tc.tenant, ts)
if tc.nodes != nil {
if err != nil {
Expand Down Expand Up @@ -226,7 +228,8 @@ func TestKetamaHashringGet(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
hashRing := newKetamaHashring(test.nodes, 10, test.n+1)
hashRing, err := newKetamaHashring(test.nodes, 10, test.n+1)
require.NoError(t, err)

result, err := hashRing.GetN("tenant", test.ts, test.n)
require.NoError(t, err)
Expand All @@ -235,6 +238,11 @@ func TestKetamaHashringGet(t *testing.T) {
}
}

func TestKetamaHashringBadConfigIsRejected(t *testing.T) {
_, err := newKetamaHashring([]string{"node-1"}, 1, 2)
require.Error(t, err)
}

func TestKetamaHashringConsistency(t *testing.T) {
series := makeSeries()

Expand Down Expand Up @@ -348,7 +356,10 @@ func assignSeries(series []prompb.TimeSeries, nodes []string) (map[string][]prom
}

func assignReplicatedSeries(series []prompb.TimeSeries, nodes []string, replicas uint64) (map[string][]prompb.TimeSeries, error) {
hashRing := newKetamaHashring(nodes, SectionsPerNode, replicas)
hashRing, err := newKetamaHashring(nodes, SectionsPerNode, replicas)
if err != nil {
return nil, err
}
assignments := make(map[string][]prompb.TimeSeries)
for i := uint64(0); i < replicas; i++ {
for _, ts := range series {
Expand Down