Skip to content

Commit

Permalink
Fix bug with counting of clusters for deleted elected replicas. (#4336)
Browse files Browse the repository at this point in the history
* Fix bug with counting of clusters for deleted elected replicas.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Make sure we delete user clusters' map if user has no more clusters.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Added PR number.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
pstibrany committed Jul 2, 2021
1 parent 297fb62 commit ff0d1a6
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336

## 1.10.0-rc.0 / 2021-06-28

Expand Down
21 changes: 16 additions & 5 deletions pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ type haTracker struct {
limits haTrackerLimits

electedLock sync.RWMutex
elected map[string]ReplicaDesc // Replicas we are accepting samples from. Key = "user/cluster".
clusters map[string]int // Number of clusters with elected replicas that a single user has. Key = user.
elected map[string]ReplicaDesc // Replicas we are accepting samples from. Key = "user/cluster".
clusters map[string]map[string]struct{} // Known clusters with elected replicas per user. First key = user, second key = cluster name.

electedReplicaChanges *prometheus.CounterVec
electedReplicaTimestamp *prometheus.GaugeVec
Expand All @@ -135,7 +135,7 @@ func newHATracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Re
updateTimeoutJitter: jitter,
limits: limits,
elected: map[string]ReplicaDesc{},
clusters: map[string]int{},
clusters: map[string]map[string]struct{}{},

electedReplicaChanges: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ha_tracker_elected_replica_changes_total",
Expand Down Expand Up @@ -226,6 +226,14 @@ func (c *haTracker) loop(ctx context.Context) error {
delete(c.elected, key)
c.electedReplicaChanges.DeleteLabelValues(user, cluster)
c.electedReplicaTimestamp.DeleteLabelValues(user, cluster)

userClusters := c.clusters[user]
if userClusters != nil {
delete(userClusters, cluster)
if len(userClusters) == 0 {
delete(c.clusters, user)
}
}
return true
}

Expand All @@ -234,7 +242,10 @@ func (c *haTracker) loop(ctx context.Context) error {
c.electedReplicaChanges.WithLabelValues(user, cluster).Inc()
}
if !exists {
c.clusters[user]++
if c.clusters[user] == nil {
c.clusters[user] = map[string]struct{}{}
}
c.clusters[user][cluster] = struct{}{}
}
c.elected[key] = *replica
c.electedReplicaTimestamp.WithLabelValues(user, cluster).Set(float64(replica.ReceivedAt / 1000))
Expand Down Expand Up @@ -359,7 +370,7 @@ func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica s

c.electedLock.RLock()
entry, ok := c.elected[key]
clusters := c.clusters[userID]
clusters := len(c.clusters[userID])
c.electedLock.RUnlock()

if ok && now.Sub(timestamp.Time(entry.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter {
Expand Down
44 changes: 43 additions & 1 deletion pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,28 @@ func TestHAClustersLimit(t *testing.T) {

assert.NoError(t, t1.checkReplica(context.Background(), userID, "b", "b2", now))
waitForClustersUpdate(t, 2, t1, userID)

// Mark cluster "a" for deletion (it was last updated 5 seconds ago)
// We use seconds timestamp resolution here, to avoid cleaning up 'b'. (In KV store, we only store seconds).
t1.cleanupOldReplicas(context.Background(), time.Unix(now.Unix(), 0))
waitForClustersUpdate(t, 1, t1, userID)

// Now adding cluster "c" works.
assert.NoError(t, t1.checkReplica(context.Background(), userID, "c", "c1", now))
waitForClustersUpdate(t, 2, t1, userID)

// But yet another cluster doesn't.
assert.EqualError(t, t1.checkReplica(context.Background(), userID, "a", "a2", now), "too many HA clusters (limit: 2)")

now = now.Add(5 * time.Second)

// clean all replicas
t1.cleanupOldReplicas(context.Background(), now)
waitForClustersUpdate(t, 0, t1, userID)

// Now "a" works again.
assert.NoError(t, t1.checkReplica(context.Background(), userID, "a", "a1", now))
waitForClustersUpdate(t, 1, t1, userID)
}

func waitForClustersUpdate(t *testing.T, expected int, tr *haTracker, userID string) {
Expand All @@ -560,7 +582,7 @@ func waitForClustersUpdate(t *testing.T, expected int, tr *haTracker, userID str
tr.electedLock.RLock()
defer tr.electedLock.RUnlock()

return tr.clusters[userID]
return len(tr.clusters[userID])
})
}

Expand Down Expand Up @@ -686,26 +708,31 @@ func TestCheckReplicaCleanup(t *testing.T) {

// Replica is not marked for deletion yet.
checkReplicaDeletionState(t, time.Second, c, userID, cluster, true, true, false)
checkUserClusters(t, time.Second, c, userID, 1)

// This will mark replica for deletion (with time.Now())
c.cleanupOldReplicas(ctx, now.Add(1*time.Second))

// Verify marking for deletion.
checkReplicaDeletionState(t, time.Second, c, userID, cluster, false, true, true)
checkUserClusters(t, time.Second, c, userID, 0)

// This will "revive" the replica.
now = time.Now()
err = c.checkReplica(context.Background(), userID, cluster, replica, now)
assert.NoError(t, err)
checkReplicaTimestamp(t, time.Second, c, userID, cluster, replica, now) // This also checks that entry is not marked for deletion.
checkUserClusters(t, time.Second, c, userID, 1)

// This will mark replica for deletion again (with new time.Now())
c.cleanupOldReplicas(ctx, now.Add(1*time.Second))
checkReplicaDeletionState(t, time.Second, c, userID, cluster, false, true, true)
checkUserClusters(t, time.Second, c, userID, 0)

// Delete entry marked for deletion completely.
c.cleanupOldReplicas(ctx, time.Now().Add(5*time.Second))
checkReplicaDeletionState(t, time.Second, c, userID, cluster, false, false, false)
checkUserClusters(t, time.Second, c, userID, 0)

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ha_tracker_replicas_cleanup_marked_for_deletion_total Number of elected replicas marked for deletion.
Expand All @@ -725,6 +752,21 @@ func TestCheckReplicaCleanup(t *testing.T) {
))
}

func checkUserClusters(t *testing.T, duration time.Duration, c *haTracker, user string, expectedClusters int) {
t.Helper()
test.Poll(t, duration, nil, func() interface{} {
c.electedLock.RLock()
cl := len(c.clusters[user])
c.electedLock.RUnlock()

if cl != expectedClusters {
return fmt.Errorf("expected clusters: %d, got %d", expectedClusters, cl)
}

return nil
})
}

func checkReplicaDeletionState(t *testing.T, duration time.Duration, c *haTracker, user, cluster string, expectedExistsInMemory, expectedExistsInKV, expectedMarkedForDeletion bool) {
key := fmt.Sprintf("%s/%s", user, cluster)

Expand Down

0 comments on commit ff0d1a6

Please sign in to comment.