Skip to content

Commit

Permalink
Add partitions ring support to Distributor.QueryStream()
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Co-authored-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
pracucci and dimitarvdimitrov committed Feb 15, 2024
1 parent dde8a56 commit 3414ac9
Show file tree
Hide file tree
Showing 6 changed files with 602 additions and 41 deletions.
89 changes: 74 additions & 15 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"io"
"math"
"math/rand"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -196,6 +197,7 @@ type Config struct {
StreamingChunksPerIngesterSeriesBufferSize uint64 `yaml:"-"`
MinimizeIngesterRequests bool `yaml:"-"`
MinimiseIngesterRequestsHedgingDelay time.Duration `yaml:"-"`
PreferAvailabilityZone string `yaml:"-"`

// IngestStorageConfig is dynamically injected because defined outside of distributor config.
IngestStorageConfig ingest.Config `yaml:"-"`
Expand Down Expand Up @@ -1503,10 +1505,39 @@ func forReplicationSet[T any](ctx context.Context, d *Distributor, replicationSe
return ring.DoUntilQuorum(ctx, replicationSet, d.queryQuorumConfig(ctx, replicationSet), wrappedF, cleanup)
}

// queryQuorumConfig returns the config to use with "do until quorum" functions when running queries.
//
// Deprecated: use queryQuorumConfigForReplicationSets() instead.
func (d *Distributor) queryQuorumConfig(ctx context.Context, replicationSet ring.ReplicationSet) ring.DoUntilQuorumConfig {
logger := spanlogger.FromContext(ctx, d.log)
return d.queryQuorumConfigForReplicationSets(ctx, []ring.ReplicationSet{replicationSet})
}

// queryQuorumConfigForReplicationSets returns the config to use with "do until quorum" functions when running queries.
func (d *Distributor) queryQuorumConfigForReplicationSets(ctx context.Context, replicationSets []ring.ReplicationSet) ring.DoUntilQuorumConfig {
var zoneSorter ring.ZoneSorter

if d.cfg.IngestStorageConfig.Enabled {
zoneSorter = queryIngesterPartitionsRingZoneSorter(d.cfg.PreferAvailabilityZone)
} else {
// We expect to always have exactly 1 replication set when ingest storage is disabled.
// To keep the code safer, we run with no zone sorter if that's not the case.
if len(replicationSets) == 1 {
zoneSorter = queryIngestersRingZoneSorter(replicationSets[0])
}
}

return ring.DoUntilQuorumConfig{
MinimizeRequests: d.cfg.MinimizeIngesterRequests,
HedgingDelay: d.cfg.MinimiseIngesterRequestsHedgingDelay,
ZoneSorter: zoneSorter,
Logger: spanlogger.FromContext(ctx, d.log),
}
}

zoneSorter := func(zones []string) []string {
// queryIngestersRingZoneSorter returns a ring.ZoneSorter that should be used to sort ingester zones
// to attempt to query first, when ingest storage is disabled.
func queryIngestersRingZoneSorter(replicationSet ring.ReplicationSet) ring.ZoneSorter {
return func(zones []string) []string {
inactiveCount := make(map[string]int, len(zones))

for _, i := range replicationSet.Instances {
Expand All @@ -1521,18 +1552,39 @@ func (d *Distributor) queryQuorumConfig(ctx context.Context, replicationSet ring

return zones
}
}

return ring.DoUntilQuorumConfig{
MinimizeRequests: d.cfg.MinimizeIngesterRequests,
HedgingDelay: d.cfg.MinimiseIngesterRequestsHedgingDelay,
ZoneSorter: zoneSorter,
Logger: logger,
// queryIngesterPartitionsRingZoneSorter returns a ring.ZoneSorter that should be used to sort
// ingester zones to attempt to query first, when ingest storage is enabled.
//
// The sorter gives preference to preferredZone if non empty, and then randomize the other zones.
func queryIngesterPartitionsRingZoneSorter(preferredZone string) ring.ZoneSorter {
return func(zones []string) []string {
// Shuffle the zones to distribute load evenly.
if len(zones) > 2 || (preferredZone == "" && len(zones) > 1) {
rand.Shuffle(len(zones), func(i, j int) {
zones[i], zones[j] = zones[j], zones[i]
})
}

if preferredZone != "" {
// Give priority to the preferred zone.
for i, z := range zones {
if z == preferredZone {
zones[0], zones[i] = zones[i], zones[0]
break
}
}
}

return zones
}
}

// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1569,7 +1621,8 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode

// LabelNamesAndValues query ingesters for label names and values and returns labels with distinct list of values.
func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*labels.Matcher, countMethod cardinality.CountMethod) (*ingester_client.LabelNamesAndValuesResponse, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1722,7 +1775,8 @@ func (d *Distributor) LabelValuesCardinality(ctx context.Context, labelNames []m
// labelValuesCardinality queries ingesters for label values cardinality of a set of labelNames
// Returns a LabelValuesCardinalityResponse where each item contains an exclusive label name and associated label values
func (d *Distributor) labelValuesCardinality(ctx context.Context, labelNames []model.LabelName, matchers []*labels.Matcher, countMethod cardinality.CountMethod) (*ingester_client.LabelValuesCardinalityResponse, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1872,7 +1926,8 @@ func (cm *labelValuesCardinalityConcurrentMap) toLabelValuesCardinalityResponse(
// ActiveSeries queries the ingester replication set for active series matching
// the given selector. It combines and deduplicates the results.
func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Matcher) ([]labels.Labels, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2080,7 +2135,8 @@ func approximateFromZones[T ~float64 | ~uint64](zoneCount int, replicationFactor

// LabelNames returns all of the label names.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]string, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2116,7 +2172,8 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, match

// MetricsForLabelMatchers gets the metrics that match said matchers
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2155,7 +2212,8 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through

// MetricsMetadata returns all metric metadata of a user.
func (d *Distributor) MetricsMetadata(ctx context.Context, req *ingester_client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2192,7 +2250,8 @@ func (d *Distributor) MetricsMetadata(ctx context.Context, req *ingester_client.

// UserStats returns statistics about the current user.
func (d *Distributor) UserStats(ctx context.Context, countMethod cardinality.CountMethod) (*UserStats, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down
89 changes: 74 additions & 15 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2530,7 +2530,8 @@ func TestDistributor_MetricsMetadata(t *testing.T) {
require.NoError(t, err)

// Check how many ingesters are queried as part of the shuffle sharding subring.
replicationSet, err := ds[0].GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := ds[0].getIngesterReplicationSetForQuery(ctx)
require.NoError(t, err)
assert.Equal(t, testData.expectedIngesters, len(replicationSet.Instances))

Expand Down Expand Up @@ -3567,13 +3568,14 @@ type prepConfig struct {
// ingesterStateByZone supersedes numIngesters, happyIngesters, and ingesterZones
ingesterStateByZone map[string]ingesterZoneState

// ingesterDataPerZone:
// ingesterDataByZone:
// map[zone-a][0] -> ingester-zone-a-0 write request
// map[zone-a][1] -> ingester-zone-a-1 write request
// Each zone in ingesterDataPerZone can be shorter than the actual number of ingesters for the zone, but it cannot be longer.
// Each zone in ingesterDataByZone can be shorter than the actual number of ingesters for the zone, but it cannot be longer.
// If a request is nil, sending a request to the ingester is skipped.
ingesterDataPerZone map[string][]*mimirpb.WriteRequest
// ingesterDataTenantID is the tenant under which ingesterDataPerZone is pushed
ingesterDataByZone map[string][]*mimirpb.WriteRequest

// ingesterDataTenantID is the tenant under which ingesterDataByZone is pushed
ingesterDataTenantID string

queryDelay time.Duration
Expand Down Expand Up @@ -3649,7 +3651,7 @@ func (c prepConfig) validate(t testing.TB) {
if len(state.ringStates) > 0 {
require.Len(t, state.ringStates, ingestersInZone, "ringStates cannot be longer than the number of ingesters in the zone")
}
require.LessOrEqual(t, len(c.ingesterDataPerZone[zone]), ingestersInZone, "ingesterDataPerZone cannot be longer than the number of ingesters in the zone")
require.LessOrEqual(t, len(c.ingesterDataByZone[zone]), ingestersInZone, "ingesterDataPerZone cannot be longer than the number of ingesters in the zone")
}
}
}
Expand Down Expand Up @@ -3921,8 +3923,8 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*
})
}

if len(cfg.ingesterDataPerZone) != 0 {
populateIngestersData(t, ingesters, cfg.ingesterDataPerZone, cfg.ingesterDataTenantID)
if len(cfg.ingesterDataByZone) != 0 {
populateIngestersData(t, ingesters, cfg.ingesterDataByZone, cfg.ingesterDataTenantID)
}

return distributors, ingesters, distributorRegistries, kafkaCluster
Expand Down Expand Up @@ -5996,7 +5998,7 @@ func TestSendMessageMetadata(t *testing.T) {
require.Equal(t, []string{strconv.Itoa(req.Size())}, mock.md[grpcutil.MetadataMessageSize])
}

func TestQueryQuorumConfig_ZoneSorting(t *testing.T) {
func TestQueryIngestersRingZoneSorter(t *testing.T) {
testCases := map[string]struct {
instances []ring.InstanceDesc
verify func(t *testing.T, sortedZones []string)
Expand Down Expand Up @@ -6102,20 +6104,77 @@ func TestQueryQuorumConfig_ZoneSorting(t *testing.T) {
},
}

d := &Distributor{
cfg: Config{},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
replicationSet := ring.ReplicationSet{
Instances: testCase.instances,
ZoneAwarenessEnabled: true,
}

cfg := d.queryQuorumConfig(context.Background(), replicationSet)
sorted := cfg.ZoneSorter(uniqueZones(testCase.instances))
sorted := queryIngestersRingZoneSorter(replicationSet)(uniqueZones(testCase.instances))
testCase.verify(t, sorted)
})
}
}

func TestQueryIngesterPartitionsRingZoneSorter(t *testing.T) {
testCases := map[string]struct {
zones []string
preferredZone string
verify func(t *testing.T, sortedZones []string)
}{
"no zones": {
zones: []string{},
verify: func(t *testing.T, sortedZones []string) {
require.Empty(t, sortedZones)
},
},
"one zone, without preferred zone": {
zones: []string{"zone-a"},
verify: func(t *testing.T, sortedZones []string) {
require.Equal(t, []string{"zone-a"}, sortedZones)
},
},
"one zone, with preferred zone": {
zones: []string{"zone-a"},
preferredZone: "zone-a",
verify: func(t *testing.T, sortedZones []string) {
require.Equal(t, []string{"zone-a"}, sortedZones)
},
},
"two zones, without preferred zone": {
zones: []string{"zone-a", "zone-b"},
verify: func(t *testing.T, sortedZones []string) {
require.ElementsMatch(t, []string{"zone-a", "zone-b"}, sortedZones)
},
},
"two zones, with preferred zone": {
zones: []string{"zone-a", "zone-b"},
preferredZone: "zone-b",
verify: func(t *testing.T, sortedZones []string) {
require.Equal(t, []string{"zone-b", "zone-a"}, sortedZones)
},
},
"many zones, without preferred zone": {
zones: []string{"zone-a", "zone-b", "zone-c", "zone-d"},
verify: func(t *testing.T, sortedZones []string) {
require.ElementsMatch(t, []string{"zone-a", "zone-b", "zone-c", "zone-d"}, sortedZones)
},
},
"many zones, with preferred zone": {
zones: []string{"zone-a", "zone-b", "zone-c", "zone-d"},
preferredZone: "zone-b",
verify: func(t *testing.T, sortedZones []string) {
require.Len(t, sortedZones, 4)
require.Equal(t, "zone-b", sortedZones[0])
require.ElementsMatch(t, []string{"zone-a", "zone-c", "zone-d"}, sortedZones[1:])
},
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
sorted := queryIngesterPartitionsRingZoneSorter(testCase.preferredZone)(testCase.zones)
testCase.verify(t, sorted)
})
}
Expand Down
Loading

0 comments on commit 3414ac9

Please sign in to comment.