Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributor: use pointers to mockIngester in tests #7223

Merged
merged 2 commits into from
Jan 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 36 additions & 37 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,11 @@ func TestDistributor_ContextCanceledRequest(t *testing.T) {
})

// Lock all mockIngester instances, so they will be waiting
for i := range ings {
ings[i].Lock()
for _, ing := range ings {
ing.Lock()
defer func(ing *mockIngester) {
ing.Unlock()
}(&ings[i])
}(ing)
}

ctx := user.InjectOrgID(context.Background(), "user")
Expand Down Expand Up @@ -3571,15 +3571,15 @@ func (c prepConfig) validate(t testing.TB) {
}
}

func prepareIngesters(cfg prepConfig) []mockIngester {
func prepareIngesters(cfg prepConfig) []*mockIngester {
if len(cfg.ingesterStateByZone) != 0 {
ingesters := []mockIngester(nil)
ingesters := []*mockIngester(nil)
for zone, state := range cfg.ingesterStateByZone {
ingesters = append(ingesters, prepareIngesterZone(zone, state, cfg)...)
}
return ingesters
}
ingesters := []mockIngester(nil)
ingesters := []*mockIngester(nil)
numZones := len(cfg.ingesterZones)
if numZones == 0 {
return prepareIngesterZone("", ingesterZoneState{numIngesters: cfg.numIngesters, happyIngesters: cfg.happyIngesters}, cfg)
Expand Down Expand Up @@ -3609,8 +3609,8 @@ func prepareIngesters(cfg prepConfig) []mockIngester {

}

func prepareIngesterZone(zone string, state ingesterZoneState, cfg prepConfig) []mockIngester {
ingesters := []mockIngester(nil)
func prepareIngesterZone(zone string, state ingesterZoneState, cfg prepConfig) []*mockIngester {
ingesters := []*mockIngester(nil)

if state.states == nil {
state.states = make([]ingesterState, state.numIngesters)
Expand All @@ -3627,7 +3627,7 @@ func prepareIngesterZone(zone string, state ingesterZoneState, cfg prepConfig) [
if len(cfg.labelNamesStreamZonesResponseDelay) > 0 {
labelNamesStreamResponseDelay = cfg.labelNamesStreamZonesResponseDelay[zone]
}
ingesters = append(ingesters, mockIngester{
ingesters = append(ingesters, &mockIngester{
id: i,
happy: s == ingesterStateHappy,
queryDelay: cfg.queryDelay,
Expand All @@ -3641,7 +3641,7 @@ func prepareIngesterZone(zone string, state ingesterZoneState, cfg prepConfig) [
return ingesters
}

func prepareRingInstances(cfg prepConfig, ingesters []mockIngester) *ring.Desc {
func prepareRingInstances(cfg prepConfig, ingesters []*mockIngester) *ring.Desc {
ingesterDescs := map[string]ring.InstanceDesc{}

for i := range ingesters {
Expand All @@ -3660,7 +3660,7 @@ func prepareRingInstances(cfg prepConfig, ingesters []mockIngester) *ring.Desc {
return &ring.Desc{Ingesters: ingesterDescs}
}

func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry) {
func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*prometheus.Registry) {
cfg.validate(t)

logger := log.NewNopLogger()
Expand Down Expand Up @@ -3697,8 +3697,7 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
})

factory := ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) {
for i := range ingesters {
ing := &ingesters[i] // Take a pointer, so we don't copy the sync.Mutex in the mockIngester
for _, ing := range ingesters {
if ing.address() == inst.Addr {
return ing, nil
}
Expand Down Expand Up @@ -3783,13 +3782,13 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
return distributors, ingesters, registries
}

func populateIngestersData(t testing.TB, ingesters []mockIngester, dataPerZone map[string][]*mimirpb.WriteRequest, tenantID string) {
func populateIngestersData(t testing.TB, ingesters []*mockIngester, dataPerZone map[string][]*mimirpb.WriteRequest, tenantID string) {
ctx := user.InjectOrgID(context.Background(), tenantID)

findIngester := func(zone string, id int) *mockIngester {
for i := range ingesters {
if ingesters[i].zone == zone && ingesters[i].id == id {
return &ingesters[i]
for _, ing := range ingesters {
if ing.zone == zone && ing.id == id {
return ing
}
}
panic("pushing to non-existent ingester; prepConfig.validate() should have caught this")
Expand Down Expand Up @@ -4953,10 +4952,10 @@ func TestDistributor_Push_Relabel(t *testing.T) {
}
}

func countMockIngestersCalled(ingesters []mockIngester, name string) int {
func countMockIngestersCalled(ingesters []*mockIngester, name string) int {
count := 0
for i := 0; i < len(ingesters); i++ {
if ingesters[i].countCalls(name) > 0 {
for _, i := range ingesters {
if i.countCalls(name) > 0 {
count++
}
}
Expand Down Expand Up @@ -5383,7 +5382,7 @@ func TestSeriesAreShardedToCorrectIngesters(t *testing.T) {
numDistributors: 1,
replicationFactor: 1, // push each series to single ingester only
}
d, ing, _ := prepare(t, config)
d, ingesters, _ := prepare(t, config)

uniqueMetricsGen := func(sampleIdx int) []mimirpb.LabelAdapter {
return []mimirpb.LabelAdapter{
Expand Down Expand Up @@ -5417,20 +5416,20 @@ func TestSeriesAreShardedToCorrectIngesters(t *testing.T) {
// Verify that each ingester only received series and metadata that it should receive.
totalSeries := 0
totalMetadata := 0
for ix := range ing {
totalSeries += len(ing[ix].timeseries)
totalMetadata += len(ing[ix].metadata)
for ix, ing := range ingesters {
totalSeries += len(ing.timeseries)
totalMetadata += len(ing.metadata)

for _, ts := range ing[ix].timeseries {
for _, ts := range ing.timeseries {
token := tokenForLabels(userName, ts.Labels)
ingIx := getIngesterIndexForToken(token, ing)
ingIx := getIngesterIndexForToken(token, ingesters)
assert.Equal(t, ix, ingIx)
}

for _, metadataMap := range ing[ix].metadata {
for _, metadataMap := range ing.metadata {
for m := range metadataMap {
token := tokenForMetadata(userName, m.MetricFamilyName)
ingIx := getIngesterIndexForToken(token, ing)
ingIx := getIngesterIndexForToken(token, ingesters)
assert.Equal(t, ix, ingIx)
}
}
Expand All @@ -5439,9 +5438,9 @@ func TestSeriesAreShardedToCorrectIngesters(t *testing.T) {
// Verify that all timeseries were forwarded to ingesters.
for _, ts := range req.Timeseries {
token := tokenForLabels(userName, ts.Labels)
ingIx := getIngesterIndexForToken(token, ing)
ingIx := getIngesterIndexForToken(token, ingesters)

assert.Equal(t, ts.Labels, ing[ingIx].timeseries[token].Labels)