diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 064cec7bcf9..9e9bfcf9acf 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1615,6 +1615,8 @@ const queryStreamBatchMessageSize = 1 * 1024 * 1024 // QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error { + start := time.Now() + if err := i.checkRunning(); err != nil { return err } @@ -1625,6 +1627,8 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ spanlog, ctx := spanlogger.NewWithLogger(stream.Context(), i.logger, "Ingester.QueryStream") defer spanlog.Finish() + deadline, deadlineSet := ctx.Deadline() + userID, err := tenant.TenantID(ctx) if err != nil { return err @@ -1677,6 +1681,13 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ level.Debug(spanlog).Log("msg", "using executeChunksQuery") numSeries, numSamples, err = i.executeChunksQuery(ctx, db, int64(from), int64(through), matchers, shard, stream) } + // For the time being, ignore context cancellation, while we investigate a problem with queries + // getting canceled in ingesters. Prior to https://github.com/grafana/mimir/pull/6085, Prometheus chunk + // queriers actually ignored context, so we are emulating that behavior. + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + dumpContextError(ctx, err, start, deadline, deadlineSet, spanlog) + err = nil + } } else { level.Debug(spanlog).Log("msg", "using executeSamplesQuery") numSeries, numSamples, err = i.executeSamplesQuery(ctx, db, int64(from), int64(through), matchers, shard, stream) @@ -1691,6 +1702,18 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return nil } +// Dump context error for diagnosis. +func dumpContextError(ctx context.Context, err error, start, deadline time.Time, deadlineSet bool, spanlog *spanlogger.SpanLogger) { + var timeout string + if deadlineSet { + timeout = fmt.Sprintf("%.2f seconds", deadline.Sub(start).Seconds()) + } else { + timeout = "not set" + } + level.Warn(spanlog).Log("msg", "query context error", "cause", context.Cause(ctx), "timeout", timeout, + "err", err) +} + func (i *Ingester) executeSamplesQuery(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, shard *sharding.ShardSelector, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) { q, err := db.Querier(from, through) if err != nil { @@ -1799,7 +1822,7 @@ func (i *Ingester) executeChunksQuery(ctx context.Context, db *userTSDB, from, t // It's not required to return sorted series because series are sorted by the Mimir querier. ss := q.Select(ctx, false, hints, matchers...) if ss.Err() != nil { - return 0, 0, ss.Err() + return 0, 0, errors.Wrap(ss.Err(), "iterating ChunkSeriesSet") } chunkSeries := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize) @@ -1855,7 +1878,7 @@ func (i *Ingester) executeChunksQuery(ctx context.Context, db *userTSDB, from, t // Ensure no error occurred while iterating the series set. if err := ss.Err(); err != nil { - return 0, 0, err + return 0, 0, errors.Wrap(err, "iterating ChunkSeriesSet") } // Final flush any existing metrics @@ -1946,7 +1969,7 @@ func (i *Ingester) sendStreamingQuerySeries(ctx context.Context, q storage.Chunk // Series must be sorted so that they can be read by the querier in the order the PromQL engine expects. ss := q.Select(ctx, true, hints, matchers...) if ss.Err() != nil { - return nil, 0, ss.Err() + return nil, 0, errors.Wrap(ss.Err(), "iterating ChunkSeriesSet") } seriesInBatch := make([]client.QueryStreamSeries, 0, queryStreamBatchSize) @@ -1975,7 +1998,7 @@ func (i *Ingester) sendStreamingQuerySeries(ctx context.Context, q storage.Chunk chunkCount, err := series.ChunkCount() if err != nil { - return nil, 0, err + return nil, 0, errors.Wrap(err, "getting ChunkSeries chunk count") } seriesInBatch = append(seriesInBatch, client.QueryStreamSeries{ @@ -2006,7 +2029,7 @@ func (i *Ingester) sendStreamingQuerySeries(ctx context.Context, q storage.Chunk // Ensure no error occurred while iterating the series set. if err := ss.Err(); err != nil { - return nil, 0, err + return nil, 0, errors.Wrap(err, "iterating ChunkSeriesSet") } return allSeriesList, seriesCount, nil