diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 4d4e359ae60..fbd8632aead 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -4836,6 +4836,10 @@ type prepConfig struct { ingestStoragePartitions int32 // Number of partitions. Auto-detected from configured ingesters if not explicitly set. ingestStorageKafka *kfake.Cluster + // We need this setting to simulate a response from ingesters that didn't support responding + // with a stream of chunks, and were responding with chunk series instead. This is needed to + // ensure backwards compatibility, i.e., that queriers can still correctly handle both types + // or responses. disableStreamingResponse bool } @@ -5805,7 +5809,7 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest } } - if i.disableStreamingResponse { + if i.disableStreamingResponse || req.StreamingChunksBatchSize == 0 { nonStreamingResponses = append(nonStreamingResponses, &client.QueryStreamResponse{ Chunkseries: []client.TimeSeriesChunk{ { diff --git a/pkg/distributor/query_ingest_storage_test.go b/pkg/distributor/query_ingest_storage_test.go index 96df57b8f4c..8f0a726c00a 100644 --- a/pkg/distributor/query_ingest_storage_test.go +++ b/pkg/distributor/query_ingest_storage_test.go @@ -12,7 +12,6 @@ import ( "github.com/grafana/dskit/ring" "github.com/grafana/dskit/test" "github.com/grafana/dskit/user" - "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" @@ -489,71 +488,62 @@ func TestDistributor_QueryStream_ShouldSupportIngestStorage(t *testing.T) { for testName, testData := range tests { testData := testData - for _, disableStreamingResponse := range []bool{false, true} { - disableStreamingResponse := disableStreamingResponse - t.Run(fmt.Sprintf("%s, streaming response disabled: %v", testName, disableStreamingResponse), func(t *testing.T) { - t.Parallel() - limits := prepareDefaultLimits() - limits.IngestionPartitionsTenantShardSize = testData.shuffleShardSize + t.Run(testName, func(t *testing.T) { + t.Parallel() - cfg := prepConfig{ - numDistributors: 1, - ingestStorageEnabled: true, - ingesterStateByZone: testData.ingesterStateByZone, - ingesterDataByZone: testData.ingesterDataByZone, - ingesterDataTenantID: tenantID, - queryDelay: 250 * time.Millisecond, // Give some time to start the calls to all ingesters before failures are received. - replicationFactor: 1, // Ingest storage is not expected to use it. - limits: limits, - disableStreamingResponse: disableStreamingResponse, - configure: func(config *Config) { - config.PreferAvailabilityZone = testData.preferZone - config.MinimizeIngesterRequests = testData.minimizeIngesterRequests - }, - } + limits := prepareDefaultLimits() + limits.IngestionPartitionsTenantShardSize = testData.shuffleShardSize - distributors, ingesters, distributorRegistries, _ := prepare(t, cfg) - require.Len(t, distributors, 1) - require.Len(t, distributorRegistries, 1) + cfg := prepConfig{ + numDistributors: 1, + ingestStorageEnabled: true, + ingesterStateByZone: testData.ingesterStateByZone, + ingesterDataByZone: testData.ingesterDataByZone, + ingesterDataTenantID: tenantID, + queryDelay: 250 * time.Millisecond, // Give some time to start the calls to all ingesters before failures are received. + replicationFactor: 1, // Ingest storage is not expected to use it. + limits: limits, + configure: func(config *Config) { + config.PreferAvailabilityZone = testData.preferZone + config.MinimizeIngesterRequests = testData.minimizeIngesterRequests + }, + } - // Query ingesters. - queryMetrics := stats.NewQueryMetrics(distributorRegistries[0]) - resp, err := distributors[0].QueryStream(ctx, queryMetrics, 0, 10, testData.matchers...) + distributors, ingesters, distributorRegistries, _ := prepare(t, cfg) + require.Len(t, distributors, 1) + require.Len(t, distributorRegistries, 1) - if testData.expectedErr == nil { - require.NoError(t, err) - } else { - assert.ErrorIs(t, err, testData.expectedErr) + // Query ingesters. + queryMetrics := stats.NewQueryMetrics(distributorRegistries[0]) + resp, err := distributors[0].QueryStream(ctx, queryMetrics, 0, 10, testData.matchers...) - // Assert that downstream gRPC statuses are passed back upstream. - _, expectedIsGRPC := grpcutil.ErrorToStatus(testData.expectedErr) - if expectedIsGRPC { - _, actualIsGRPC := grpcutil.ErrorToStatus(err) - assert.True(t, actualIsGRPC, fmt.Sprintf("expected error to be a status error, but got: %T", err)) - } - } + if testData.expectedErr == nil { + require.NoError(t, err) + } else { + assert.ErrorIs(t, err, testData.expectedErr) - var responseMatrix model.Matrix - if len(resp.Chunkseries) == 0 { - responseMatrix, err = ingester_client.StreamingSeriesToMatrix(0, 5, resp.StreamingSeries) - } else { - responseMatrix, err = ingester_client.TimeSeriesChunksToMatrix(0, 5, resp.Chunkseries) + // Assert that downstream gRPC statuses are passed back upstream. + _, expectedIsGRPC := grpcutil.ErrorToStatus(testData.expectedErr) + if expectedIsGRPC { + _, actualIsGRPC := grpcutil.ErrorToStatus(err) + assert.True(t, actualIsGRPC, fmt.Sprintf("expected error to be a status error, but got: %T", err)) } - assert.NoError(t, err) - assert.Equal(t, testData.expectedResponse.String(), responseMatrix.String()) + } - // Check how many ingesters have been queried. - // Because we return immediately on failures, it might take some time for all ingester calls to register. - test.Poll(t, 4*cfg.queryDelay, testData.expectedQueriedIngesters, func() any { return countMockIngestersCalls(ingesters, "QueryStream") }) + var responseMatrix model.Matrix + if len(resp.Chunkseries) == 0 { + responseMatrix, err = ingester_client.StreamingSeriesToMatrix(0, 5, resp.StreamingSeries) + } else { + responseMatrix, err = ingester_client.TimeSeriesChunksToMatrix(0, 5, resp.Chunkseries) + } + assert.NoError(t, err) + assert.Equal(t, testData.expectedResponse.String(), responseMatrix.String()) - if disableStreamingResponse { - // We expected the number of non-deduplicated chunks to be equal to the number of queried series - // given we expect 1 chunk per series. - assert.Equal(t, float64(testData.expectedResponse.Len()), testutil.ToFloat64(queryMetrics.IngesterChunksTotal)-testutil.ToFloat64(queryMetrics.IngesterChunksDeduplicated)) - } - }) - } + // Check how many ingesters have been queried. + // Because we return immediately on failures, it might take some time for all ingester calls to register. + test.Poll(t, 4*cfg.queryDelay, testData.expectedQueriedIngesters, func() any { return countMockIngestersCalls(ingesters, "QueryStream") }) + }) } } diff --git a/pkg/ingester/client/chunkcompat.go b/pkg/ingester/client/chunkcompat.go index a21a8d9d527..f34521a4a4a 100644 --- a/pkg/ingester/client/chunkcompat.go +++ b/pkg/ingester/client/chunkcompat.go @@ -73,18 +73,21 @@ func StreamingSeriesToMatrix(from, through model.Time, sSeries []StreamingSeries } result := model.Matrix{} + var chunks []Chunk for _, series := range sSeries { + chunks = chunks[:0] for sourceIdx, source := range series.Sources { - chunks, err := source.StreamReader.GetChunks(source.SeriesIndex) + sourceChunks, err := source.StreamReader.GetChunks(source.SeriesIndex) if err != nil { return nil, fmt.Errorf("GetChunks() from stream reader for series %d from source %d: %w", source.SeriesIndex, sourceIdx, err) } - stream, err := seriesChunksToMatrix(from, through, series.Labels, chunks) - if err != nil { - return nil, err - } - result = append(result, stream) + chunks = append(chunks, sourceChunks...) } + stream, err := seriesChunksToMatrix(from, through, series.Labels, chunks) + if err != nil { + return nil, err + } + result = append(result, stream) } return result, nil }