Skip to content

Commit

Permalink
Handle context.Canceled in active series requests (#7102)
Browse files Browse the repository at this point in the history
* Handle context.Canceled errors in callback for `forReplicationSet`
  • Loading branch information
flxbk committed Jan 11, 2024
1 parent 105b861 commit cc3a1b3
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1832,11 +1832,20 @@ func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Match
res := newActiveSeriesResponse(d.hashCollisionCount, maxResponseSize)

ingesterQuery := func(ctx context.Context, client ingester_client.IngesterClient) (any, error) {
// This function is invoked purely for its side effects on the captured
// activeSeriesResponse, its return value is never used.
type ignored struct{}

log, ctx := spanlogger.NewWithLogger(ctx, d.log, "Distributor.ActiveSeries.queryIngester")
defer log.Finish()

stream, err := client.ActiveSeries(ctx, req)
if err != nil {
if errors.Is(err, context.Canceled) {
return ignored{}, nil
}
level.Error(log).Log("msg", "error creating active series response stream", "err", err)
ext.Error.Set(log.Span, true)
return nil, err
}

Expand All @@ -1849,9 +1858,13 @@ func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Match

for {
msg, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
} else if err != nil {
if err != nil {
if errors.Is(err, io.EOF) {
break
}
if errors.Is(err, context.Canceled) {
return ignored{}, nil
}
level.Error(log).Log("msg", "error receiving active series response", "err", err)
ext.Error.Set(log.Span, true)
return nil, err
Expand All @@ -1863,7 +1876,7 @@ func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Match
}
}

return nil, nil
return ignored{}, nil
}

_, err = forReplicationSet(ctx, d, replicationSet, ingesterQuery)
Expand Down

0 comments on commit cc3a1b3

Please sign in to comment.