Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitions ring support to Distributor.QueryStream() #7388

Merged
merged 7 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/google/gopacket v1.1.19
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20240214175438-3cabc9619ece
github.com/grafana/dskit v0.0.0-20240215093740-1ff57c889843
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/json-iterator/go v1.1.12
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,8 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4=
github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85Tnn+WEvr8fDpfwibmEPgfgFEaC87G24=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/dskit v0.0.0-20240214175438-3cabc9619ece h1:4t5jKvI8Fq13ozG9gCkOMlvECl82EbO2COnniHUP+Uc=
github.com/grafana/dskit v0.0.0-20240214175438-3cabc9619ece/go.mod h1:x5DMwyr1kyirtHOxoFSZ7RnyOgHdGh03ZruupdPetQM=
github.com/grafana/dskit v0.0.0-20240215093740-1ff57c889843 h1:WtI6smDyCqXanrPzhHd6u9EfRJHOuUm0GoujJoPMe6s=
github.com/grafana/dskit v0.0.0-20240215093740-1ff57c889843/go.mod h1:x5DMwyr1kyirtHOxoFSZ7RnyOgHdGh03ZruupdPetQM=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI=
github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae h1:Yxbw9jKGJVC6qAK5Ubzzb/qZwM6rRMMqaDc/d4Vp3pM=
Expand Down
89 changes: 74 additions & 15 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"io"
"math"
"math/rand"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -196,6 +197,7 @@ type Config struct {
StreamingChunksPerIngesterSeriesBufferSize uint64 `yaml:"-"`
MinimizeIngesterRequests bool `yaml:"-"`
MinimiseIngesterRequestsHedgingDelay time.Duration `yaml:"-"`
PreferAvailabilityZone string `yaml:"-"`

// IngestStorageConfig is dynamically injected because defined outside of distributor config.
IngestStorageConfig ingest.Config `yaml:"-"`
Expand Down Expand Up @@ -1503,10 +1505,39 @@ func forReplicationSet[T any](ctx context.Context, d *Distributor, replicationSe
return ring.DoUntilQuorum(ctx, replicationSet, d.queryQuorumConfig(ctx, replicationSet), wrappedF, cleanup)
}

// queryQuorumConfig returns the config to use with "do until quorum" functions when running queries.
//
// Deprecated: use queryQuorumConfigForReplicationSets() instead.
func (d *Distributor) queryQuorumConfig(ctx context.Context, replicationSet ring.ReplicationSet) ring.DoUntilQuorumConfig {
logger := spanlogger.FromContext(ctx, d.log)
return d.queryQuorumConfigForReplicationSets(ctx, []ring.ReplicationSet{replicationSet})
}

// queryQuorumConfigForReplicationSets returns the config to use with "do until quorum" functions when running queries.
func (d *Distributor) queryQuorumConfigForReplicationSets(ctx context.Context, replicationSets []ring.ReplicationSet) ring.DoUntilQuorumConfig {
var zoneSorter ring.ZoneSorter

if d.cfg.IngestStorageConfig.Enabled {
zoneSorter = queryIngesterPartitionsRingZoneSorter(d.cfg.PreferAvailabilityZone)
} else {
// We expect to always have exactly 1 replication set when ingest storage is disabled.
// To keep the code safer, we run with no zone sorter if that's not the case.
if len(replicationSets) == 1 {
zoneSorter = queryIngestersRingZoneSorter(replicationSets[0])
}
Comment on lines +1524 to +1526
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick, but i'd even panic if that's not the case

}

return ring.DoUntilQuorumConfig{
MinimizeRequests: d.cfg.MinimizeIngesterRequests,
HedgingDelay: d.cfg.MinimiseIngesterRequestsHedgingDelay,
ZoneSorter: zoneSorter,
Logger: spanlogger.FromContext(ctx, d.log),
}
}

zoneSorter := func(zones []string) []string {
// queryIngestersRingZoneSorter returns a ring.ZoneSorter that should be used to sort ingester zones
// to attempt to query first, when ingest storage is disabled.
func queryIngestersRingZoneSorter(replicationSet ring.ReplicationSet) ring.ZoneSorter {
return func(zones []string) []string {
inactiveCount := make(map[string]int, len(zones))

for _, i := range replicationSet.Instances {
Expand All @@ -1521,18 +1552,39 @@ func (d *Distributor) queryQuorumConfig(ctx context.Context, replicationSet ring

return zones
}
}

return ring.DoUntilQuorumConfig{
MinimizeRequests: d.cfg.MinimizeIngesterRequests,
HedgingDelay: d.cfg.MinimiseIngesterRequestsHedgingDelay,
ZoneSorter: zoneSorter,
Logger: logger,
// queryIngesterPartitionsRingZoneSorter returns a ring.ZoneSorter that should be used to sort
// ingester zones to attempt to query first, when ingest storage is enabled.
//
// The sorter gives preference to preferredZone if non empty, and then randomize the other zones.
func queryIngesterPartitionsRingZoneSorter(preferredZone string) ring.ZoneSorter {
return func(zones []string) []string {
// Shuffle the zones to distribute load evenly.
if len(zones) > 2 || (preferredZone == "" && len(zones) > 1) {
rand.Shuffle(len(zones), func(i, j int) {
zones[i], zones[j] = zones[j], zones[i]
})
}

if preferredZone != "" {
// Give priority to the preferred zone.
for i, z := range zones {
if z == preferredZone {
zones[0], zones[i] = zones[i], zones[0]
break
}
}
}

return zones
}
}

// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this because of the deprecated notice?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of deprecated functions will disappear as soon as I complete this work (over the course of few more PRs).

replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1569,7 +1621,8 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode

// LabelNamesAndValues query ingesters for label names and values and returns labels with distinct list of values.
func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*labels.Matcher, countMethod cardinality.CountMethod) (*ingester_client.LabelNamesAndValuesResponse, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1722,7 +1775,8 @@ func (d *Distributor) LabelValuesCardinality(ctx context.Context, labelNames []m
// labelValuesCardinality queries ingesters for label values cardinality of a set of labelNames
// Returns a LabelValuesCardinalityResponse where each item contains an exclusive label name and associated label values
func (d *Distributor) labelValuesCardinality(ctx context.Context, labelNames []model.LabelName, matchers []*labels.Matcher, countMethod cardinality.CountMethod) (*ingester_client.LabelValuesCardinalityResponse, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1872,7 +1926,8 @@ func (cm *labelValuesCardinalityConcurrentMap) toLabelValuesCardinalityResponse(
// ActiveSeries queries the ingester replication set for active series matching
// the given selector. It combines and deduplicates the results.
func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Matcher) ([]labels.Labels, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2080,7 +2135,8 @@ func approximateFromZones[T ~float64 | ~uint64](zoneCount int, replicationFactor

// LabelNames returns all of the label names.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]string, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2116,7 +2172,8 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, match

// MetricsForLabelMatchers gets the metrics that match said matchers
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2155,7 +2212,8 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through

// MetricsMetadata returns all metric metadata of a user.
func (d *Distributor) MetricsMetadata(ctx context.Context, req *ingester_client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2192,7 +2250,8 @@ func (d *Distributor) MetricsMetadata(ctx context.Context, req *ingester_client.

// UserStats returns statistics about the current user.
func (d *Distributor) UserStats(ctx context.Context, countMethod cardinality.CountMethod) (*UserStats, error) {
replicationSet, err := d.GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
if err != nil {
return nil, err
}
Expand Down
91 changes: 75 additions & 16 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2530,7 +2530,8 @@ func TestDistributor_MetricsMetadata(t *testing.T) {
require.NoError(t, err)

// Check how many ingesters are queried as part of the shuffle sharding subring.
replicationSet, err := ds[0].GetIngesters(ctx)
//nolint:staticcheck
replicationSet, err := ds[0].getIngesterReplicationSetForQuery(ctx)
require.NoError(t, err)
assert.Equal(t, testData.expectedIngesters, len(replicationSet.Instances))

Expand Down Expand Up @@ -3567,13 +3568,14 @@ type prepConfig struct {
// ingesterStateByZone supersedes numIngesters, happyIngesters, and ingesterZones
ingesterStateByZone map[string]ingesterZoneState

// ingesterDataPerZone:
// ingesterDataByZone:
// map[zone-a][0] -> ingester-zone-a-0 write request
// map[zone-a][1] -> ingester-zone-a-1 write request
// Each zone in ingesterDataPerZone can be shorter than the actual number of ingesters for the zone, but it cannot be longer.
// Each zone in ingesterDataByZone can be shorter than the actual number of ingesters for the zone, but it cannot be longer.
// If a request is nil, sending a request to the ingester is skipped.
ingesterDataPerZone map[string][]*mimirpb.WriteRequest
// ingesterDataTenantID is the tenant under which ingesterDataPerZone is pushed
ingesterDataByZone map[string][]*mimirpb.WriteRequest

// ingesterDataTenantID is the tenant under which ingesterDataByZone is pushed
ingesterDataTenantID string

queryDelay time.Duration
Expand Down Expand Up @@ -3649,7 +3651,7 @@ func (c prepConfig) validate(t testing.TB) {
if len(state.ringStates) > 0 {
require.Len(t, state.ringStates, ingestersInZone, "ringStates cannot be longer than the number of ingesters in the zone")
}
require.LessOrEqual(t, len(c.ingesterDataPerZone[zone]), ingestersInZone, "ingesterDataPerZone cannot be longer than the number of ingesters in the zone")
require.LessOrEqual(t, len(c.ingesterDataByZone[zone]), ingestersInZone, "ingesterDataPerZone cannot be longer than the number of ingesters in the zone")
}
}
}
Expand Down Expand Up @@ -3734,7 +3736,7 @@ func prepareRingInstances(cfg prepConfig, ingesters []*mockIngester) *ring.Desc
ingesterDescs[addr] = ring.InstanceDesc{
Addr: addr,
Zone: ingesters[i].zone,
State: cfg.ingesterRingState(ingesters[i].zone, i),
State: cfg.ingesterRingState(ingesters[i].zone, ingesters[i].id),
Timestamp: time.Now().Unix(),
RegisteredTimestamp: time.Now().Add(-2 * time.Hour).Unix(), // registered before the shuffle sharding lookback period, so we don't start including other ingesters
Tokens: tokens,
Expand Down Expand Up @@ -3921,8 +3923,8 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*
})
}

if len(cfg.ingesterDataPerZone) != 0 {
populateIngestersData(t, ingesters, cfg.ingesterDataPerZone, cfg.ingesterDataTenantID)
if len(cfg.ingesterDataByZone) != 0 {
populateIngestersData(t, ingesters, cfg.ingesterDataByZone, cfg.ingesterDataTenantID)
}

return distributors, ingesters, distributorRegistries, kafkaCluster
Expand Down Expand Up @@ -5996,7 +5998,7 @@ func TestSendMessageMetadata(t *testing.T) {
require.Equal(t, []string{strconv.Itoa(req.Size())}, mock.md[grpcutil.MetadataMessageSize])
}

func TestQueryQuorumConfig_ZoneSorting(t *testing.T) {
func TestQueryIngestersRingZoneSorter(t *testing.T) {
testCases := map[string]struct {
instances []ring.InstanceDesc
verify func(t *testing.T, sortedZones []string)
Expand Down Expand Up @@ -6102,20 +6104,77 @@ func TestQueryQuorumConfig_ZoneSorting(t *testing.T) {
},
}

d := &Distributor{
cfg: Config{},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
replicationSet := ring.ReplicationSet{
Instances: testCase.instances,
ZoneAwarenessEnabled: true,
}

cfg := d.queryQuorumConfig(context.Background(), replicationSet)
sorted := cfg.ZoneSorter(uniqueZones(testCase.instances))
sorted := queryIngestersRingZoneSorter(replicationSet)(uniqueZones(testCase.instances))
testCase.verify(t, sorted)
})
}
}

func TestQueryIngesterPartitionsRingZoneSorter(t *testing.T) {
testCases := map[string]struct {
zones []string
preferredZone string
verify func(t *testing.T, sortedZones []string)
}{
"no zones": {
zones: []string{},
verify: func(t *testing.T, sortedZones []string) {
require.Empty(t, sortedZones)
},
},
"one zone, without preferred zone": {
zones: []string{"zone-a"},
verify: func(t *testing.T, sortedZones []string) {
require.Equal(t, []string{"zone-a"}, sortedZones)
},
},
"one zone, with preferred zone": {
zones: []string{"zone-a"},
preferredZone: "zone-a",
verify: func(t *testing.T, sortedZones []string) {
require.Equal(t, []string{"zone-a"}, sortedZones)
},
},
"two zones, without preferred zone": {
zones: []string{"zone-a", "zone-b"},
verify: func(t *testing.T, sortedZones []string) {
require.ElementsMatch(t, []string{"zone-a", "zone-b"}, sortedZones)
},
},
"two zones, with preferred zone": {
zones: []string{"zone-a", "zone-b"},
preferredZone: "zone-b",
verify: func(t *testing.T, sortedZones []string) {
require.Equal(t, []string{"zone-b", "zone-a"}, sortedZones)
},
},
"many zones, without preferred zone": {
zones: []string{"zone-a", "zone-b", "zone-c", "zone-d"},
verify: func(t *testing.T, sortedZones []string) {
require.ElementsMatch(t, []string{"zone-a", "zone-b", "zone-c", "zone-d"}, sortedZones)
},
},
"many zones, with preferred zone": {
zones: []string{"zone-a", "zone-b", "zone-c", "zone-d"},
preferredZone: "zone-b",
verify: func(t *testing.T, sortedZones []string) {
require.Len(t, sortedZones, 4)
require.Equal(t, "zone-b", sortedZones[0])
require.ElementsMatch(t, []string{"zone-a", "zone-c", "zone-d"}, sortedZones[1:])
},
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
sorted := queryIngesterPartitionsRingZoneSorter(testCase.preferredZone)(testCase.zones)
testCase.verify(t, sorted)
})
}
Expand Down
Loading
Loading