Skip to content

Commit

Permalink
Explicitly call cancel() when needed
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Lei <lei.justin@gmail.com>
  • Loading branch information
leizor committed Apr 26, 2024
1 parent 22b8c54 commit 8961556
Showing 1 changed file with 9 additions and 13 deletions.
22 changes: 9 additions & 13 deletions storage/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error {
return RecoverableError{err, defaultBackoff}
}
defer func() {
io.Copy(io.Discard, httpResp.Body)
httpResp.Body.Close()
_, _ = io.Copy(io.Discard, httpResp.Body)
_ = httpResp.Body.Close()
}()

if httpResp.StatusCode/100 != 2 {
Expand Down Expand Up @@ -313,18 +313,14 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool)
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")

ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer func() {
if cancel != nil {
cancel()
}
}()

ctx, span := otel.Tracer("").Start(ctx, "Remote Read", trace.WithSpanKind(trace.SpanKindClient))
defer span.End()

start := time.Now()
httpResp, err := c.Client.Do(httpReq.WithContext(ctx))
if err != nil {
cancel()
return nil, fmt.Errorf("error sending request: %w", err)
}

Expand All @@ -333,6 +329,7 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool)
body, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()

cancel()
return nil, fmt.Errorf("remote server %s returned http status %s: %s", c.urlString, httpResp.Status, string(body))
}

Expand All @@ -342,13 +339,11 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool)
case strings.HasPrefix(contentType, "application/x-protobuf"):
c.readQueriesDuration.WithLabelValues("sampled").Observe(time.Since(start).Seconds())
c.readQueriesTotal.WithLabelValues("sampled", strconv.Itoa(httpResp.StatusCode)).Inc()
return c.handleSampledResponse(req, httpResp, sortSeries)
ss, err := c.handleSampledResponse(req, httpResp, sortSeries)
cancel()
return ss, err
case strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"):
c.readQueriesDuration.WithLabelValues("chunked").Observe(time.Since(start).Seconds())
// We copy cancel here so we can nil out the original and prevent the context from being cancelled as soon as
// Read returns.
cancelCopy := cancel
cancel = nil

s := NewChunkedReader(httpResp.Body, c.chunkedReadLimit, nil)
return NewChunkedSeriesSet(s, httpResp.Body, query.StartTimestampMs, query.EndTimestampMs, func(err error) {
Expand All @@ -357,11 +352,12 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool)
code = "aborted_stream"
}
c.readQueriesTotal.WithLabelValues("chunked", code).Inc()
cancelCopy()
cancel()
}), nil
default:
c.readQueriesDuration.WithLabelValues("unsupported").Observe(time.Since(start).Seconds())
c.readQueriesTotal.WithLabelValues("unsupported", strconv.Itoa(httpResp.StatusCode)).Inc()
cancel()
return nil, fmt.Errorf("unsupported content type: %s", contentType)
}
}
Expand Down

0 comments on commit 8961556

Please sign in to comment.