diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a1a8b54a..eaaab4984 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ * [ENHANCEMENT] Added option to BasicLifecycler to keep instance in the ring when stopping. #97 * [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95 * [ENHANCEMENT] Trigger metrics update on ring changes instead of doing it periodically to speed up tests that wait for certain metrics. #107 +* [ENHANCEMENT] Ring: Add ring page handler to BasicLifecycler and Lifecycler. #112 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 diff --git a/ring/basic_lifecycler.go b/ring/basic_lifecycler.go index 726a85430..32775c982 100644 --- a/ring/basic_lifecycler.go +++ b/ring/basic_lifecycler.go @@ -3,6 +3,7 @@ package ring import ( "context" "fmt" + "net/http" "sort" "sync" "time" @@ -491,3 +492,20 @@ func (l *BasicLifecycler) run(fn func() error) error { return <-errCh } } + +func (l *BasicLifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { + return l.store.CAS(ctx, l.ringKey, f) +} + +func (l *BasicLifecycler) getRing(ctx context.Context) (*Desc, error) { + obj, err := l.store.Get(ctx, l.ringKey) + if err != nil { + return nil, err + } + + return GetOrCreateRingDesc(obj), nil +} + +func (l *BasicLifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + newRingPageHandler(l, l.cfg.HeartbeatPeriod).handle(w, req) +} diff --git a/ring/http.go b/ring/http.go index f23f08b81..1d6c10e80 100644 --- a/ring/http.go +++ b/ring/http.go @@ -10,8 +10,6 @@ import ( "sort" "strings" "time" - - "github.com/go-kit/log/level" ) const pageContent = ` @@ -90,19 +88,6 @@ func init() { pageTemplate = template.Must(t.Parse(pageContent)) } -func (r *Ring) forget(ctx context.Context, id string) error { - unregister := func(in interface{}) (out interface{}, retry bool, err error) { - if in == nil { - return nil, false, fmt.Errorf("found empty ring when trying to unregister") - } - - ringDesc := in.(*Desc) - ringDesc.RemoveIngester(id) - return ringDesc, true, nil - } - return r.KVClient.CAS(ctx, r.key, unregister) -} - type ingesterDesc struct { ID string `json:"id"` State string `json:"state"` @@ -121,11 +106,33 @@ type httpResponse struct { ShowTokens bool `json:"-"` } -func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { +type ringAccess interface { + casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error + getRing(context.Context) (*Desc, error) +} + +type ringPageHandler struct { + r ringAccess + heartbeatPeriod time.Duration +} + +func newRingPageHandler(r ringAccess, heartbeatPeriod time.Duration) *ringPageHandler { + return &ringPageHandler{ + r: r, + heartbeatPeriod: heartbeatPeriod, + } +} + +func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { if req.Method == http.MethodPost { ingesterID := req.FormValue("forget") - if err := r.forget(req.Context(), ingesterID); err != nil { - level.Error(r.logger).Log("msg", "error forgetting instance", "err", err) + if err := h.forget(req.Context(), ingesterID); err != nil { + http.Error( + w, + fmt.Errorf("error forgetting instance '%s': %w", ingesterID, err).Error(), + http.StatusInternalServerError, + ) + return } // Implement PRG pattern to prevent double-POST and work with CSRF middleware. @@ -140,23 +147,26 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - r.mtx.RLock() - defer r.mtx.RUnlock() + ringDesc, err := h.r.getRing(req.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + _, ownedTokens := ringDesc.countTokens() ingesterIDs := []string{} - for id := range r.ringDesc.Ingesters { + for id := range ringDesc.Ingesters { ingesterIDs = append(ingesterIDs, id) } sort.Strings(ingesterIDs) now := time.Now() var ingesters []ingesterDesc - _, owned := r.countTokens() for _, id := range ingesterIDs { - ing := r.ringDesc.Ingesters[id] + ing := ringDesc.Ingesters[id] heartbeatTimestamp := time.Unix(ing.Timestamp, 0) state := ing.State.String() - if !r.IsHealthy(&ing, Reporting, now) { + if !ing.IsHealthy(Reporting, h.heartbeatPeriod, now) { state = unhealthy } @@ -175,7 +185,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { Tokens: ing.Tokens, Zone: ing.Zone, NumTokens: len(ing.Tokens), - Ownership: (float64(owned[id]) / float64(math.MaxUint32)) * 100, + Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100, }) } @@ -203,6 +213,19 @@ func renderHTTPResponse(w http.ResponseWriter, v httpResponse, t *template.Templ } } +func (h *ringPageHandler) forget(ctx context.Context, id string) error { + unregister := func(in interface{}) (out interface{}, retry bool, err error) { + if in == nil { + return nil, false, fmt.Errorf("found empty ring when trying to unregister") + } + + ringDesc := in.(*Desc) + ringDesc.RemoveIngester(id) + return ringDesc, true, nil + } + return h.r.casRing(ctx, unregister) +} + // WriteJSONResponse writes some JSON as a HTTP response. func writeJSONResponse(w http.ResponseWriter, v httpResponse) { w.Header().Set("Content-Type", "application/json") diff --git a/ring/lifecycler.go b/ring/lifecycler.go index be103e1fb..e8d1655e7 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "net/http" "os" "sort" "sync" @@ -849,6 +850,23 @@ func (i *Lifecycler) processShutdown(ctx context.Context) { time.Sleep(i.cfg.FinalSleep) } +func (i *Lifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { + return i.KVStore.CAS(ctx, i.RingKey, f) +} + +func (i *Lifecycler) getRing(ctx context.Context) (*Desc, error) { + obj, err := i.KVStore.Get(ctx, i.RingKey) + if err != nil { + return nil, err + } + + return GetOrCreateRingDesc(obj), nil +} + +func (i *Lifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + newRingPageHandler(i, i.cfg.HeartbeatPeriod).handle(w, req) +} + // unregister removes our entry from consul. func (i *Lifecycler) unregister(ctx context.Context) error { level.Debug(i.logger).Log("msg", "unregistering instance from ring", "ring", i.RingName) diff --git a/ring/ring.go b/ring/ring.go index 6aaf165bf..5553c6b72 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -8,11 +8,13 @@ import ( "fmt" "math" "math/rand" + "net/http" "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -513,27 +515,32 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro } // countTokens returns the number of tokens and tokens within the range for each instance. -// The ring read lock must be already taken when calling this function. -func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) { - owned := map[string]uint32{} - numTokens := map[string]uint32{} - for i, token := range r.ringTokens { +func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) { + var ( + owned = map[string]uint32{} + numTokens = map[string]uint32{} + + ringTokens = r.GetTokens() + ringInstanceByToken = r.getTokensInfo() + ) + + for i, token := range ringTokens { var diff uint32 // Compute how many tokens are within the range. - if i+1 == len(r.ringTokens) { - diff = (math.MaxUint32 - token) + r.ringTokens[0] + if i+1 == len(ringTokens) { + diff = (math.MaxUint32 - token) + ringTokens[0] } else { - diff = r.ringTokens[i+1] - token + diff = ringTokens[i+1] - token } - info := r.ringInstanceByToken[token] + info := ringInstanceByToken[token] numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1 owned[info.InstanceID] = owned[info.InstanceID] + diff } // Set to 0 the number of owned tokens by instances which don't have tokens yet. - for id := range r.ringDesc.Ingesters { + for id := range r.Ingesters { if _, ok := owned[id]; !ok { owned[id] = 0 numTokens[id] = 0 @@ -582,7 +589,7 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { prevOwners := r.reportedOwners r.reportedOwners = make(map[string]struct{}) - numTokens, ownedRange := r.countTokens() + numTokens, ownedRange := r.ringDesc.countTokens() for id, totalOwned := range ownedRange { r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) @@ -840,6 +847,23 @@ func (r *Ring) CleanupShuffleShardCache(identifier string) { } } +func (r *Ring) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { + return r.KVClient.CAS(ctx, r.key, f) +} + +func (r *Ring) getRing(ctx context.Context) (*Desc, error) { + r.mtx.RLock() + defer r.mtx.RUnlock() + + ringDesc := proto.Clone(r.ringDesc).(*Desc) + + return ringDesc, nil +} + +func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { + newRingPageHandler(r, r.cfg.HeartbeatTimeout).handle(w, req) +} + // Operation describes which instances can be included in the replica set, based on their state. // // Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states.