Skip to content

Commit

Permalink
Ingester.QueryStream: Ignore context cancellation for chunk queriers
Browse files Browse the repository at this point in the history
In Ingester.QueryStream, ignore context cancellation wrt. chunk
queriers, the way it worked prior to
#6085. If the context is canceled
though, log it with full diagnostics.

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Oct 18, 2023
1 parent 74edb76 commit 998948e
Showing 1 changed file with 28 additions and 5 deletions.
33 changes: 28 additions & 5 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,8 @@ const queryStreamBatchMessageSize = 1 * 1024 * 1024

// QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
start := time.Now()

if err := i.checkRunning(); err != nil {
return err
}
Expand All @@ -1625,6 +1627,8 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
spanlog, ctx := spanlogger.NewWithLogger(stream.Context(), i.logger, "Ingester.QueryStream")
defer spanlog.Finish()

deadline, deadlineSet := ctx.Deadline()

userID, err := tenant.TenantID(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -1677,6 +1681,13 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
level.Debug(spanlog).Log("msg", "using executeChunksQuery")
numSeries, numSamples, err = i.executeChunksQuery(ctx, db, int64(from), int64(through), matchers, shard, stream)
}
// For the time being, ignore context cancellation, while we investigate a problem with queries
// getting canceled in ingesters. Prior to https://github.com/grafana/mimir/pull/6085, Prometheus chunk
// queriers actually ignored context, so we are emulating that behavior.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
dumpContextError(ctx, err, start, deadline, deadlineSet, spanlog)
err = nil
}
} else {
level.Debug(spanlog).Log("msg", "using executeSamplesQuery")
numSeries, numSamples, err = i.executeSamplesQuery(ctx, db, int64(from), int64(through), matchers, shard, stream)
Expand All @@ -1691,6 +1702,18 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return nil
}

// Dump context error for diagnosis.
func dumpContextError(ctx context.Context, err error, start, deadline time.Time, deadlineSet bool, spanlog *spanlogger.SpanLogger) {
var timeout string
if deadlineSet {
timeout = fmt.Sprintf("%.2f seconds", deadline.Sub(start).Seconds())
} else {
timeout = "not set"
}
level.Warn(spanlog).Log("msg", "query context error", "cause", context.Cause(ctx), "timeout", timeout,
"err", err)
}

func (i *Ingester) executeSamplesQuery(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, shard *sharding.ShardSelector, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
q, err := db.Querier(from, through)
if err != nil {
Expand Down Expand Up @@ -1799,7 +1822,7 @@ func (i *Ingester) executeChunksQuery(ctx context.Context, db *userTSDB, from, t
// It's not required to return sorted series because series are sorted by the Mimir querier.
ss := q.Select(ctx, false, hints, matchers...)
if ss.Err() != nil {
return 0, 0, ss.Err()
return 0, 0, errors.Wrap(ss.Err(), "iterating ChunkSeriesSet")
}

chunkSeries := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
Expand Down Expand Up @@ -1855,7 +1878,7 @@ func (i *Ingester) executeChunksQuery(ctx context.Context, db *userTSDB, from, t

// Ensure no error occurred while iterating the series set.
if err := ss.Err(); err != nil {
return 0, 0, err
return 0, 0, errors.Wrap(err, "iterating ChunkSeriesSet")
}

// Final flush any existing metrics
Expand Down Expand Up @@ -1946,7 +1969,7 @@ func (i *Ingester) sendStreamingQuerySeries(ctx context.Context, q storage.Chunk
// Series must be sorted so that they can be read by the querier in the order the PromQL engine expects.
ss := q.Select(ctx, true, hints, matchers...)
if ss.Err() != nil {
return nil, 0, ss.Err()
return nil, 0, errors.Wrap(ss.Err(), "iterating ChunkSeriesSet")
}

seriesInBatch := make([]client.QueryStreamSeries, 0, queryStreamBatchSize)
Expand Down Expand Up @@ -1975,7 +1998,7 @@ func (i *Ingester) sendStreamingQuerySeries(ctx context.Context, q storage.Chunk

chunkCount, err := series.ChunkCount()
if err != nil {
return nil, 0, err
return nil, 0, errors.Wrap(err, "getting ChunkSeries chunk count")
}

seriesInBatch = append(seriesInBatch, client.QueryStreamSeries{
Expand Down Expand Up @@ -2006,7 +2029,7 @@ func (i *Ingester) sendStreamingQuerySeries(ctx context.Context, q storage.Chunk

// Ensure no error occurred while iterating the series set.
if err := ss.Err(); err != nil {
return nil, 0, err
return nil, 0, errors.Wrap(err, "iterating ChunkSeriesSet")
}

return allSeriesList, seriesCount, nil
Expand Down

0 comments on commit 998948e

Please sign in to comment.