Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive: fix serverAsClient.Series goroutines leak #6948

Merged
merged 6 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7323](https://github.com/thanos-io/thanos/pull/7323) Sidecar: wait for prometheus on startup
- [#7326](https://github.com/thanos-io/thanos/pull/7326) Query: fixing exemplars proxy when querying stores with multiple tenants.
- [#7335](https://github.com/thanos-io/thanos/pull/7335) Dependency: Update minio-go to v7.0.70 which includes support for EKS Pod Identity.
- [#6948](https://github.com/thanos-io/thanos/pull/6948) Receive: fix goroutines leak during series requests to thanos store api.

### Added

Expand Down
2 changes: 0 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
var resp respSet
if s.sortingStrategy == sortingStrategyStore {
resp = newEagerRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
Expand All @@ -1585,7 +1584,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
)
} else {
resp = newLazyRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
Expand Down
139 changes: 56 additions & 83 deletions pkg/store/proxy_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,11 @@ func (l *lazyRespSet) StoreLabels() map[string]struct{} {
type lazyRespSet struct {
// Generic parameters.
span opentracing.Span
cl storepb.Store_SeriesClient
closeSeries context.CancelFunc
storeName string
storeLabelSets []labels.Labels
storeLabels map[string]struct{}
frameTimeout time.Duration
ctx context.Context

// Internal bookkeeping.
dataOrFinishEvent *sync.Cond
Expand Down Expand Up @@ -294,7 +292,6 @@ func (l *lazyRespSet) At() *storepb.SeriesResponse {
}

func newLazyRespSet(
ctx context.Context,
span opentracing.Span,
frameTimeout time.Duration,
storeName string,
Expand All @@ -311,12 +308,10 @@ func newLazyRespSet(

respSet := &lazyRespSet{
frameTimeout: frameTimeout,
cl: cl,
storeName: storeName,
storeLabelSets: storeLabelSets,
closeSeries: closeSeries,
span: span,
ctx: ctx,
dataOrFinishEvent: dataAvailable,
bufferedResponsesMtx: bufferedResponsesMtx,
bufferedResponses: bufferedResponses,
Expand Down Expand Up @@ -353,19 +348,9 @@ func newLazyRespSet(
defer t.Reset(frameTimeout)
}

select {
case <-l.ctx.Done():
err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st)
l.span.SetTag("err", err.Error())
resp, err := cl.Recv()

l.bufferedResponsesMtx.Lock()
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err))
l.noMoreData = true
l.dataOrFinishEvent.Signal()
l.bufferedResponsesMtx.Unlock()
return false
default:
resp, err := cl.Recv()
if err != nil {
if err == io.EOF {
l.bufferedResponsesMtx.Lock()
l.noMoreData = true
Expand All @@ -374,45 +359,43 @@ func newLazyRespSet(
return false
}

if err != nil {
// TODO(bwplotka): Return early on error. Don't wait of dedup, merge and sort if partial response is disabled.
var rerr error
if t != nil && !t.Stop() && errors.Is(err, context.Canceled) {
// Most likely the per-Recv timeout has been reached.
// There's a small race between canceling and the Recv()
// but this is most likely true.
var rerr error
// If timer is already stopped
if t != nil && !t.Stop() {
if errors.Is(err, context.Canceled) {
// The per-Recv timeout has been reached.
rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st)
} else {
rerr = errors.Wrapf(err, "receive series from %s", st)
}

l.span.SetTag("err", rerr.Error())

l.bufferedResponsesMtx.Lock()
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.noMoreData = true
l.dataOrFinishEvent.Signal()
l.bufferedResponsesMtx.Unlock()
return false
}

numResponses++
bytesProcessed += resp.Size()

if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
} else {
rerr = errors.Wrapf(err, "receive series from %s", st)
}

if resp.GetSeries() != nil {
seriesStats.Count(resp.GetSeries())
}
l.span.SetTag("err", rerr.Error())

l.bufferedResponsesMtx.Lock()
l.bufferedResponses = append(l.bufferedResponses, resp)
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.noMoreData = true
l.dataOrFinishEvent.Signal()
l.bufferedResponsesMtx.Unlock()
return false
}

numResponses++
bytesProcessed += resp.Size()

if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
}

if resp.GetSeries() != nil {
seriesStats.Count(resp.GetSeries())
}

l.bufferedResponsesMtx.Lock()
l.bufferedResponses = append(l.bufferedResponses, resp)
l.dataOrFinishEvent.Signal()
l.bufferedResponsesMtx.Unlock()
return true
}

var t *time.Timer
Expand Down Expand Up @@ -509,7 +492,6 @@ func newAsyncRespSet(
switch retrievalStrategy {
case LazyRetrieval:
return newLazyRespSet(
seriesCtx,
span,
frameTimeout,
st.String(),
Expand All @@ -522,7 +504,6 @@ func newAsyncRespSet(
), nil
case EagerRetrieval:
return newEagerRespSet(
seriesCtx,
span,
frameTimeout,
st.String(),
Expand Down Expand Up @@ -556,8 +537,6 @@ func (l *lazyRespSet) Close() {
type eagerRespSet struct {
// Generic parameters.
span opentracing.Span
cl storepb.Store_SeriesClient
ctx context.Context

closeSeries context.CancelFunc
frameTimeout time.Duration
Expand All @@ -576,7 +555,6 @@ type eagerRespSet struct {
}

func newEagerRespSet(
ctx context.Context,
span opentracing.Span,
frameTimeout time.Duration,
storeName string,
Expand All @@ -591,9 +569,7 @@ func newEagerRespSet(
ret := &eagerRespSet{
span: span,
closeSeries: closeSeries,
cl: cl,
frameTimeout: frameTimeout,
ctx: ctx,
bufferedResponses: []*storepb.SeriesResponse{},
wg: &sync.WaitGroup{},
shardMatcher: shardMatcher,
Expand Down Expand Up @@ -638,48 +614,45 @@ func newEagerRespSet(
defer t.Reset(frameTimeout)
}

select {
case <-l.ctx.Done():
err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", storeName)
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err))
l.span.SetTag("err", err.Error())
return false
default:
resp, err := cl.Recv()
resp, err := cl.Recv()

if err != nil {
if err == io.EOF {
return false
}
if err != nil {
// TODO(bwplotka): Return early on error. Don't wait of dedup, merge and sort if partial response is disabled.
var rerr error
if t != nil && !t.Stop() && errors.Is(err, context.Canceled) {
// Most likely the per-Recv timeout has been reached.
// There's a small race between canceling and the Recv()
// but this is most likely true.

var rerr error
// If timer is already stopped
if t != nil && !t.Stop() {
<-t.C // Drain the channel if it was already stopped.
if errors.Is(err, context.Canceled) {
// The per-Recv timeout has been reached.
rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName)
} else {
rerr = errors.Wrapf(err, "receive series from %s", storeName)
}
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.span.SetTag("err", rerr.Error())
return false
} else {
rerr = errors.Wrapf(err, "receive series from %s", storeName)
}

numResponses++
bytesProcessed += resp.Size()

if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
}
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.span.SetTag("err", rerr.Error())
return false
}

if resp.GetSeries() != nil {
seriesStats.Count(resp.GetSeries())
}
numResponses++
bytesProcessed += resp.Size()

l.bufferedResponses = append(l.bufferedResponses, resp)
if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
}

if resp.GetSeries() != nil {
seriesStats.Count(resp.GetSeries())
}

l.bufferedResponses = append(l.bufferedResponses, resp)
return true
}

var t *time.Timer
if frameTimeout > 0 {
t = time.AfterFunc(frameTimeout, closeSeries)
Expand Down
10 changes: 5 additions & 5 deletions pkg/store/storepb/inprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc
inSrv := &inProcessStream{recv: make(chan *SeriesResponse), err: make(chan error)}
inSrv.ctx, inSrv.cancel = context.WithCancel(ctx)
go func() {
inSrv.err <- s.srv.Series(in, inSrv)
if err := s.srv.Series(in, inSrv); err != nil {
inSrv.err <- err
}
close(inSrv.err)
close(inSrv.recv)
}()
Expand Down Expand Up @@ -88,15 +90,13 @@ func (s *inProcessClientStream) CloseSend() error {

func (s *inProcessClientStream) Recv() (*SeriesResponse, error) {
select {
case <-s.srv.ctx.Done():
return nil, s.srv.ctx.Err()
case r, ok := <-s.srv.recv:
if !ok {
return nil, io.EOF
}
return r, nil
case err := <-s.srv.err:
if err == nil {
case err, ok := <-s.srv.err:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this lead to problems if s.srv.err is closed while we are continuously calling Recv() and there's something to read from s.srv.recv? In other words, from my understanding select{} chooses randomly if there are multiple possibilities so with s.srv.err closed we could prematurely return io.EOF, no? (https://go.dev/ref/spec#Select_statements, see second point). I think the same applies to the case r, ok branch. My suggestion would be to give a priority to s.srv.recv.

select {
  case r, ok := <-s.srv.recv:
    if !ok {
      break
    }
    return r, nil
}

err, ok := <-s.srv.err
if ok {
  return nil, err
}
return nil, io.EOF

Copy link
Contributor Author

@thibaultmg thibaultmg May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that the race scenario you describe can happen here:

  • The s.srv.err channel can only be closed when s.srv.Series(in, inSrv) returns.
  • The Series() method only returns after it has finished pushing data into the unbuffered s.srv.recv channel.
  • Consequently, Series() only returns once the last bits of data have been consumed by the Recv() caller.
  • Making the suggested race impossible.

Regarding the proposed change to give priority to s.srv.recv, I see a potential issue if the Series() call returns an error:

  • Recv() will block on reading s.srv.recv which is not yet closed.
  • Series() will block trying to write the error into the unbuffered err channel.
  • This results in neither channel being closed, leading to a deadlock.

WDYT @GiedriusS ?

if !ok {
return nil, io.EOF
}
return nil, err
Expand Down
Loading