Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache subrings returned by Ring.ShuffleShardWithLookback() #283

Merged
merged 9 commits into from
Apr 27, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
* [ENHANCEMENT] Memcached: Add support for using TLS or mTLS with Memcached based caching. #278
* [ENHANCEMENT] Ring: improve performance of shuffle sharding computation. #281
* [ENHANCEMENT] Add option to enable IPv6 address detection in ring and memberlist handling. #185
* [ENHANCEMENT] Ring: cache results of shuffle sharding with lookback where possible. #283
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
137 changes: 125 additions & 12 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ type Ring struct {

// Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes.
// If set to nil, no caching is done (used by tests, and subrings).
shuffledSubringCache map[subringCacheKey]*Ring
shuffledSubringCache map[subringCacheKey]*Ring
shuffledSubringWithLookbackCache map[subringCacheKey]cachedSubringWithLookback

numMembersGaugeVec *prometheus.GaugeVec
totalTokensGauge prometheus.Gauge
Expand All @@ -194,8 +195,15 @@ type Ring struct {
}

type subringCacheKey struct {
identifier string
shardSize int
identifier string
shardSize int
lookbackPeriod time.Duration
}

type cachedSubringWithLookback struct {
subring *Ring
validForLookbackWindowsStartingAfter int64 // if the lookback window is from T to S, validForLookbackWindowsStartingAfter is the earliest value of T this cache entry is valid for
validForLookbackWindowsStartingBefore int64 // if the lookback window is from T to S, validForLookbackWindowsStartingBefore is the latest value of T this cache entry is valid for
}

// New creates a new Ring. Being a service, Ring needs to be started to do anything.
Expand All @@ -221,12 +229,13 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client
}

r := &Ring{
key: key,
cfg: cfg,
KVClient: store,
strategy: strategy,
ringDesc: &Desc{},
shuffledSubringCache: map[subringCacheKey]*Ring{},
key: key,
cfg: cfg,
KVClient: store,
strategy: strategy,
ringDesc: &Desc{},
shuffledSubringCache: map[subringCacheKey]*Ring{},
shuffledSubringWithLookbackCache: map[subringCacheKey]cachedSubringWithLookback{},
numMembersGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_members",
Help: "Number of members in the ring",
Expand Down Expand Up @@ -324,10 +333,15 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
r.ringZones = ringZones
r.oldestRegisteredTimestamp = oldestRegisteredTimestamp
r.lastTopologyChange = now

// Invalidate all cached subrings.
if r.shuffledSubringCache != nil {
// Invalidate all cached subrings.
r.shuffledSubringCache = make(map[subringCacheKey]*Ring)
}
if r.shuffledSubringWithLookbackCache != nil {
r.shuffledSubringWithLookbackCache = make(map[subringCacheKey]cachedSubringWithLookback)
}

r.updateRingMetrics(rc)
}

Expand Down Expand Up @@ -623,14 +637,25 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
// The returned subring may be unbalanced with regard to zones and should never be used for write
// operations (read only).
//
// This function doesn't support caching.
// This function supports caching, but the cache will only be effective if successive calls for the
// same identifier are for increasing values of (now-lookbackPeriod).
func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing {
// Nothing to do if the shard size is not smaller then the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
}

return r.shuffleShard(identifier, size, lookbackPeriod, now)
if cached := r.getCachedShuffledSubringWithLookback(identifier, size, lookbackPeriod, now); cached != nil {
return cached
}

result := r.shuffleShard(identifier, size, lookbackPeriod, now)

if result != r {
r.setCachedShuffledSubringWithLookback(identifier, size, lookbackPeriod, now, result)
}

return result
}

func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time) *Ring {
Expand Down Expand Up @@ -877,6 +902,88 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, subring *Ri
}
}

func (r *Ring) getCachedShuffledSubringWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) *Ring {
if r.cfg.SubringCacheDisabled {
return nil
}

cached, ok := r.shuffledSubringWithLookbackCache[subringCacheKey{identifier: identifier, shardSize: size, lookbackPeriod: lookbackPeriod}]
if !ok {
return nil
}

lookbackWindowStart := now.Add(-lookbackPeriod).Unix()
if lookbackWindowStart < cached.validForLookbackWindowsStartingAfter || lookbackWindowStart > cached.validForLookbackWindowsStartingBefore {
// The cached subring is not valid for the lookback window that has been requested.
return nil
}

cachedSubring := cached.subring
charleskorn marked this conversation as resolved.
Show resolved Hide resolved

// No need to update the cached subring if it is the original ring itself.
if r == cachedSubring {
return cachedSubring
}

cachedSubring.mtx.Lock()
defer cachedSubring.mtx.Unlock()

// Update instance states and timestamps. We know that the topology is the same,
// so zones and tokens are equal.
for name, cachedIng := range cachedSubring.ringDesc.Ingesters {
ing := r.ringDesc.Ingesters[name]
cachedIng.State = ing.State
cachedIng.Timestamp = ing.Timestamp
cachedSubring.ringDesc.Ingesters[name] = cachedIng
}

return cachedSubring
}

func (r *Ring) setCachedShuffledSubringWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time, subring *Ring) {
if subring == nil || r.cfg.SubringCacheDisabled {
return
}

lookbackWindowStart := now.Add(-lookbackPeriod).Unix()
validForLookbackWindowsStartingBefore := int64(math.MaxInt64)

for _, instance := range subring.ringDesc.Ingesters {
registeredDuringLookbackWindow := instance.RegisteredTimestamp >= lookbackWindowStart

if registeredDuringLookbackWindow && instance.RegisteredTimestamp < validForLookbackWindowsStartingBefore {
validForLookbackWindowsStartingBefore = instance.RegisteredTimestamp
}
}

r.mtx.Lock()
defer r.mtx.Unlock()

// Only cache if *this* ring hasn't changed since computing result
// (which can happen between releasing the read lock and getting read-write lock).
// Note that shuffledSubringWithLookbackCache can be only nil when set by test.
if r.shuffledSubringWithLookbackCache == nil {
return
}

if !r.lastTopologyChange.Equal(subring.lastTopologyChange) {
return
}

// Only update cache if subring's lookback window starts later than the previously cached subring for this identifier,
// if there is one. This prevents cache thrashing due to different calls competing if their lookback windows start
// before and after the time of an instance registering.
key := subringCacheKey{identifier: identifier, shardSize: size, lookbackPeriod: lookbackPeriod}

if existingEntry, haveCached := r.shuffledSubringWithLookbackCache[key]; !haveCached || existingEntry.validForLookbackWindowsStartingAfter < lookbackWindowStart {
r.shuffledSubringWithLookbackCache[key] = cachedSubringWithLookback{
subring: subring,
validForLookbackWindowsStartingAfter: lookbackWindowStart,
validForLookbackWindowsStartingBefore: validForLookbackWindowsStartingBefore,
}
}
}

func (r *Ring) CleanupShuffleShardCache(identifier string) {
if r.cfg.SubringCacheDisabled {
return
Expand All @@ -890,6 +997,12 @@ func (r *Ring) CleanupShuffleShardCache(identifier string) {
delete(r.shuffledSubringCache, k)
}
}

for k := range r.shuffledSubringWithLookbackCache {
if k.identifier == identifier {
delete(r.shuffledSubringWithLookbackCache, k)
}
}
}

func (r *Ring) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error {
Expand Down
Loading