diff --git a/blob/service.go b/blob/service.go index 12ef3ea828..7016269c07 100644 --- a/blob/service.go +++ b/blob/service.go @@ -63,6 +63,7 @@ type Service struct { // activeSubscriptions tracks the number of active subscriptions activeSubscriptions sync.WaitGroup + stopped bool mu sync.Mutex } @@ -88,6 +89,7 @@ func (s *Service) Start(context.Context) error { func (s *Service) Stop(context.Context) error { s.mu.Lock() if s.cancel != nil { + s.stopped = true s.cancel() } s.mu.Unlock() @@ -103,6 +105,10 @@ type SubscriptionResponse struct { func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *SubscriptionResponse, error) { s.mu.Lock() + if s.stopped { + s.mu.Unlock() + return nil, fmt.Errorf("service has been stopped") + } if s.ctx == nil { s.mu.Unlock() return nil, fmt.Errorf("service has not been started") @@ -125,6 +131,7 @@ func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *Su select { case header, ok := <-headerCh: if ctx.Err() != nil { + log.Debug("blobsub: cancelling subscription due to user context closing") return } if !ok { @@ -134,6 +141,7 @@ func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *Su blobs, err := s.getAll(ctx, header, []share.Namespace{ns}) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { // context canceled, continuing would lead to unexpected missed heights for the client + log.Debug("blobsub: cancelling subscription due to user context closing") return } if err != nil { @@ -143,12 +151,15 @@ func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *Su select { case <-ctx.Done(): + log.Debug("blobsub: cancelling subscription with pending response due to user context closing") return case blobCh <- &SubscriptionResponse{Blobs: blobs, Height: header.Height()}: } case <-ctx.Done(): + log.Debug("blobsub: cancelling subscription due to user context closing") return case <-s.ctx.Done(): + log.Debug("blobsub: cancelling subscription due to service context closing") return } } @@ -252,7 +263,7 @@ func (s *Service) GetAll(ctx context.Context, height uint64, namespaces []share. func (s *Service) getAll( ctx context.Context, header *header.ExtendedHeader, - namespaces []share.Namespace + namespaces []share.Namespace, ) ([]*Blob, error) { height := header.Height() var (