Skip to content

Commit

Permalink
Remove -querier.iterators and -querier.batch-iterators
Browse files Browse the repository at this point in the history
The `-querier.iterators` and `-querier.batch-iterators` configuration
parameters have been removed.

See #5114 for more details.
  • Loading branch information
leizor committed Nov 15, 2023
1 parent 1d6581c commit cc2cbda
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 155 deletions.
22 changes: 0 additions & 22 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1658,28 +1658,6 @@
"required": false,
"desc": "",
"blockEntries": [
{
"kind": "field",
"name": "iterators",
"required": false,
"desc": "Use iterators to execute query, as opposed to fully materialising the series in memory.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "querier.iterators",
"fieldType": "boolean",
"fieldCategory": "deprecated"
},
{
"kind": "field",
"name": "batch_iterators",
"required": false,
"desc": "Use batch iterators to execute query, as opposed to fully materialising the series in memory. Takes precedent over the -querier.iterators flag.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "querier.batch-iterators",
"fieldType": "boolean",
"fieldCategory": "deprecated"
},
{
"kind": "field",
"name": "query_store_after",
Expand Down
4 changes: 0 additions & 4 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1611,8 +1611,6 @@ Usage of ./cmd/mimir/mimir:
Minimum time to wait for ring stability at startup, if set to positive value. Set to 0 to disable.
-print.config
Print the config and exit.
-querier.batch-iterators
[deprecated] Use batch iterators to execute query, as opposed to fully materialising the series in memory. Takes precedent over the -querier.iterators flag. (default true)
-querier.cardinality-analysis-enabled
Enables endpoints used for cardinality analysis.
-querier.default-evaluation-interval duration
Expand Down Expand Up @@ -1667,8 +1665,6 @@ Usage of ./cmd/mimir/mimir:
Override the expected name on the server certificate.
-querier.id string
Querier ID, sent to the query-frontend to identify requests from the same querier. Defaults to hostname.
-querier.iterators
[deprecated] Use iterators to execute query, as opposed to fully materialising the series in memory.
-querier.label-names-and-values-results-max-size-bytes int
Maximum size in bytes of distinct label names and values. When querier receives response from ingester, it merges the response with responses from other ingesters. This maximum size limit is applied to the merged(distinct) results. If the limit is reached, an error is returned. (default 419430400)
-querier.label-values-max-cardinality-label-names-per-request int
Expand Down
11 changes: 0 additions & 11 deletions docs/sources/mimir/references/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1168,17 +1168,6 @@ instance_limits:
The `querier` block configures the querier.

```yaml
# (deprecated) Use iterators to execute query, as opposed to fully materialising
# the series in memory.
# CLI flag: -querier.iterators
[iterators: <boolean> | default = false]
# (deprecated) Use batch iterators to execute query, as opposed to fully
# materialising the series in memory. Takes precedent over the
# -querier.iterators flag.
# CLI flag: -querier.batch-iterators
[batch_iterators: <boolean> | default = true]
# (advanced) The time after which a metric should be queried from storage and
# not just ingesters. 0 means all queries are sent to store. If this option is
# enabled, the time range of the query sent to the store-gateway will be
Expand Down
23 changes: 2 additions & 21 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"github.com/prometheus/prometheus/util/annotations"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/querier/batch"
"github.com/grafana/mimir/pkg/querier/engine"
"github.com/grafana/mimir/pkg/querier/iterators"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storage/chunk"
"github.com/grafana/mimir/pkg/storage/lazyquery"
Expand All @@ -40,9 +38,6 @@ import (

// Config contains the configuration require to create a querier
type Config struct {
Iterators bool `yaml:"iterators" category:"deprecated"` // Deprecated: Deprecated in Mimir 2.9.0, remove in Mimir 2.11.0 (https://github.com/grafana/mimir/issues/5107)
BatchIterators bool `yaml:"batch_iterators" category:"deprecated"` // Deprecated: Deprecated in Mimir 2.9.0, remove in Mimir 2.11.0 (https://github.com/grafana/mimir/issues/5107)

// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
QueryStoreAfter time.Duration `yaml:"query_store_after" category:"advanced"`
MaxQueryIntoFuture time.Duration `yaml:"max_query_into_future" category:"advanced"`
Expand Down Expand Up @@ -78,10 +73,6 @@ var (
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.StoreGatewayClient.RegisterFlagsWithPrefix("querier.store-gateway-client", f)

// TODO: these two flags were deprecated in Mimir 2.9.0, remove them in Mimir 2.11.0 (https://github.com/grafana/mimir/issues/5107)
f.BoolVar(&cfg.Iterators, "querier.iterators", false, "Use iterators to execute query, as opposed to fully materialising the series in memory.")
f.BoolVar(&cfg.BatchIterators, "querier.batch-iterators", true, "Use batch iterators to execute query, as opposed to fully materialising the series in memory. Takes precedent over the -querier.iterators flag.")

f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.")
f.DurationVar(&cfg.QueryStoreAfter, queryStoreAfterFlag, 12*time.Hour, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. If this option is enabled, the time range of the query sent to the store-gateway will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.")
f.BoolVar(&cfg.ShuffleShardingIngestersEnabled, "querier.shuffle-sharding-ingesters-enabled", true, fmt.Sprintf("Fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since -%s. If this setting is false or -%s is '0', queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).", validation.QueryIngestersWithinFlag, validation.QueryIngestersWithinFlag))
Expand Down Expand Up @@ -115,15 +106,6 @@ func (cfg *Config) ValidateLimits(limits validation.Limits) error {
return nil
}

func getChunksIteratorFunction(cfg Config) chunkIteratorFunc {
if cfg.BatchIterators {
return batch.NewChunkMergeIterator
} else if cfg.Iterators {
return iterators.NewChunkMergeIterator
}
return mergeChunks
}

// ShouldQueryIngesters provides a check for whether the ingesters will be used for a given query.
func ShouldQueryIngesters(queryIngestersWithin time.Duration, now time.Time, queryMaxT int64) bool {
if queryIngestersWithin != 0 {
Expand All @@ -148,12 +130,11 @@ func ShouldQueryBlockStore(queryStoreAfter time.Duration, now time.Time, queryMi

// New builds a queryable and promql engine.
func New(cfg Config, limits *validation.Overrides, distributor Distributor, storeQueryable storage.Queryable, reg prometheus.Registerer, logger log.Logger, tracker *activitytracker.ActivityTracker) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, *promql.Engine) {
iteratorFunc := getChunksIteratorFunction(cfg)
queryMetrics := stats.NewQueryMetrics(reg)

distributorQueryable := newDistributorQueryable(distributor, iteratorFunc, limits, queryMetrics, logger)
distributorQueryable := newDistributorQueryable(distributor, mergeChunks, limits, queryMetrics, logger)

queryable := newQueryable(distributorQueryable, storeQueryable, iteratorFunc, cfg, limits, queryMetrics, logger)
queryable := newQueryable(distributorQueryable, storeQueryable, mergeChunks, cfg, limits, queryMetrics, logger)
exemplarQueryable := newDistributorExemplarQueryable(distributor, logger)

lazyQueryable := storage.QueryableFunc(func(minT int64, maxT int64) (storage.Querier, error) {
Expand Down
110 changes: 13 additions & 97 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,26 +207,22 @@ func TestQuerier(t *testing.T) {
},
}

for _, query := range queries {
for _, iterators := range []bool{false, true} {
t.Run(fmt.Sprintf("%s/iterators=%t", query.query, iterators), func(t *testing.T) {
// Generate TSDB head used to simulate querying the long-term storage.
db, through := mockTSDB(t, model.Time(0), int(chunks*samplesPerChunk), sampleRate, chunkOffset, int(samplesPerChunk), query.valueType)
for _, q := range queries {
t.Run(q.query, func(t *testing.T) {
// Generate TSDB head used to simulate querying the long-term storage.
db, through := mockTSDB(t, model.Time(0), int(chunks*samplesPerChunk), sampleRate, chunkOffset, int(samplesPerChunk), q.valueType)

cfg.Iterators = iterators

// No samples returned by ingesters.
distributor := &mockDistributor{}
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryResponse{}, nil)
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(client.CombinedQueryStreamResponse{}, nil)
// No samples returned by ingesters.
distributor := &mockDistributor{}
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryResponse{}, nil)
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(client.CombinedQueryStreamResponse{}, nil)

overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil)
require.NoError(t, err)
overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil)
require.NoError(t, err)

queryable, _, _ := New(cfg, overrides, distributor, db, nil, log.NewNopLogger(), nil)
testRangeQuery(t, queryable, through, query)
})
}
queryable, _, _ := New(cfg, overrides, distributor, db, nil, log.NewNopLogger(), nil)
testRangeQuery(t, queryable, through, q)
})
}
}

Expand Down Expand Up @@ -320,86 +316,6 @@ func TestQuerier_QueryableReturnsChunksOutsideQueriedRange(t *testing.T) {
}, m[0].Floats)
}

// TestBatchMergeChunks is a regression test to catch one particular case
// when the Batch merger iterator was corrupting memory by not copying
// Batches by value because the Batch itself was not possible to copy
// by value.
func TestBatchMergeChunks(t *testing.T) {
var (
logger = log.NewNopLogger()
queryStart = mustParseTime("2021-11-01T06:00:00Z")
queryEnd = mustParseTime("2021-11-01T06:01:00Z")
queryStep = time.Second
)

var cfg Config
flagext.DefaultValues(&cfg)
cfg.BatchIterators = true // Always use the Batch iterator - regression test

limits := defaultLimitsConfig()
limits.QueryIngestersWithin = 0 // Always query ingesters in this test.
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

s1 := []mimirpb.Sample{}
s2 := []mimirpb.Sample{}

for i := 0; i < 12; i++ {
s1 = append(s1, mimirpb.Sample{Value: float64(i * 15000), TimestampMs: queryStart.Add(time.Duration(i) * time.Second).UnixMilli()})
if i != 9 { // let series 3 miss a point
s2 = append(s2, mimirpb.Sample{Value: float64(i * 15000), TimestampMs: queryStart.Add(time.Duration(i) * time.Second).UnixMilli()})
}
}

c1 := convertToChunks(t, samplesToInterface(s1))
c2 := convertToChunks(t, samplesToInterface(s2))
chunks12 := []client.Chunk{}
chunks12 = append(chunks12, c1...)
chunks12 = append(chunks12, c2...)

chunks21 := []client.Chunk{}
chunks21 = append(chunks21, c2...)
chunks21 = append(chunks21, c1...)

// Mock distributor to return chunks that need merging.
distributor := &mockDistributor{}
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
client.CombinedQueryStreamResponse{
Chunkseries: []client.TimeSeriesChunk{
// Series with chunks in the 1,2 order, that need merge
{
Labels: []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "one"}, {Name: labels.InstanceName, Value: "foo"}},
Chunks: chunks12,
},
// Series with chunks in the 2,1 order, that need merge
{
Labels: []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "one"}, {Name: labels.InstanceName, Value: "bar"}},
Chunks: chunks21,
},
},
},
nil)

engine := promql.NewEngine(promql.EngineOpts{
Logger: logger,
MaxSamples: 1e6,
Timeout: 1 * time.Minute,
})

queryable, _, _ := New(cfg, overrides, distributor, nil, nil, logger, nil)
ctx := user.InjectOrgID(context.Background(), "user-1")
query, err := engine.NewRangeQuery(ctx, queryable, nil, `rate({__name__=~".+"}[10s])`, queryStart, queryEnd, queryStep)
require.NoError(t, err)

r := query.Exec(ctx)
m, err := r.Matrix()
require.NoError(t, err)

require.Equal(t, 2, m.Len())
require.ElementsMatch(t, m[0].Floats, m[1].Floats)
require.ElementsMatch(t, m[0].Histograms, m[1].Histograms)
}

func mockTSDB(t *testing.T, mint model.Time, samples int, step, chunkOffset time.Duration, samplesPerChunk int, valueType func(model.Time) chunkenc.ValueType) (storage.Queryable, model.Time) {
dir := t.TempDir()

Expand Down

0 comments on commit cc2cbda

Please sign in to comment.