diff --git a/storage/remote/client.go b/storage/remote/client.go index 6a873bdd559..4f97b119d45 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -239,8 +239,8 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { return RecoverableError{err, defaultBackoff} } defer func() { - io.Copy(io.Discard, httpResp.Body) - httpResp.Body.Close() + _, _ = io.Copy(io.Discard, httpResp.Body) + _ = httpResp.Body.Close() }() if httpResp.StatusCode/100 != 2 { @@ -313,11 +313,6 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") ctx, cancel := context.WithTimeout(ctx, c.timeout) - defer func() { - if cancel != nil { - cancel() - } - }() ctx, span := otel.Tracer("").Start(ctx, "Remote Read", trace.WithSpanKind(trace.SpanKindClient)) defer span.End() @@ -325,6 +320,7 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) start := time.Now() httpResp, err := c.Client.Do(httpReq.WithContext(ctx)) if err != nil { + cancel() return nil, fmt.Errorf("error sending request: %w", err) } @@ -333,6 +329,7 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) body, _ := io.ReadAll(httpResp.Body) _ = httpResp.Body.Close() + cancel() return nil, fmt.Errorf("remote server %s returned http status %s: %s", c.urlString, httpResp.Status, string(body)) } @@ -342,13 +339,11 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) case strings.HasPrefix(contentType, "application/x-protobuf"): c.readQueriesDuration.WithLabelValues("sampled").Observe(time.Since(start).Seconds()) c.readQueriesTotal.WithLabelValues("sampled", strconv.Itoa(httpResp.StatusCode)).Inc() - return c.handleSampledResponse(req, httpResp, sortSeries) + ss, err := c.handleSampledResponse(req, httpResp, sortSeries) + cancel() + return ss, err case strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"): c.readQueriesDuration.WithLabelValues("chunked").Observe(time.Since(start).Seconds()) - // We copy cancel here so we can nil out the original and prevent the context from being cancelled as soon as - // Read returns. - cancelCopy := cancel - cancel = nil s := NewChunkedReader(httpResp.Body, c.chunkedReadLimit, nil) return NewChunkedSeriesSet(s, httpResp.Body, query.StartTimestampMs, query.EndTimestampMs, func(err error) { @@ -357,11 +352,12 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) code = "aborted_stream" } c.readQueriesTotal.WithLabelValues("chunked", code).Inc() - cancelCopy() + cancel() }), nil default: c.readQueriesDuration.WithLabelValues("unsupported").Observe(time.Since(start).Seconds()) c.readQueriesTotal.WithLabelValues("unsupported", strconv.Itoa(httpResp.StatusCode)).Inc() + cancel() return nil, fmt.Errorf("unsupported content type: %s", contentType) } }