Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Jul 24, 2024
1 parent 49b6b92 commit da7c540
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Service struct {

// activeSubscriptions tracks the number of active subscriptions
activeSubscriptions sync.WaitGroup
stopped bool
mu sync.Mutex
}

Expand All @@ -88,6 +89,7 @@ func (s *Service) Start(context.Context) error {
func (s *Service) Stop(context.Context) error {
s.mu.Lock()
if s.cancel != nil {
s.stopped = true
s.cancel()
}
s.mu.Unlock()
Expand All @@ -103,6 +105,10 @@ type SubscriptionResponse struct {

func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *SubscriptionResponse, error) {
s.mu.Lock()
if s.stopped {
s.mu.Unlock()
return nil, fmt.Errorf("service has been stopped")
}
if s.ctx == nil {
s.mu.Unlock()
return nil, fmt.Errorf("service has not been started")
Expand All @@ -125,6 +131,7 @@ func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *Su
select {
case header, ok := <-headerCh:
if ctx.Err() != nil {
log.Debug("blobsub: cancelling subscription due to user context closing")
return
}
if !ok {
Expand All @@ -134,6 +141,7 @@ func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *Su
blobs, err := s.getAll(ctx, header, []share.Namespace{ns})
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// context canceled, continuing would lead to unexpected missed heights for the client
log.Debug("blobsub: cancelling subscription due to user context closing")
return
}
if err != nil {
Expand All @@ -143,12 +151,15 @@ func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *Su

select {
case <-ctx.Done():
log.Debug("blobsub: cancelling subscription with pending response due to user context closing")
return
case blobCh <- &SubscriptionResponse{Blobs: blobs, Height: header.Height()}:
}
case <-ctx.Done():
log.Debug("blobsub: cancelling subscription due to user context closing")
return
case <-s.ctx.Done():
log.Debug("blobsub: cancelling subscription due to service context closing")
return
}
}
Expand Down Expand Up @@ -252,7 +263,7 @@ func (s *Service) GetAll(ctx context.Context, height uint64, namespaces []share.
func (s *Service) getAll(
ctx context.Context,
header *header.ExtendedHeader,
namespaces []share.Namespace
namespaces []share.Namespace,
) ([]*Blob, error) {
height := header.Height()
var (
Expand Down

0 comments on commit da7c540

Please sign in to comment.