Skip to content

Commit

Permalink
feat(blob): blobsub (celestiaorg#3539)
Browse files Browse the repository at this point in the history
Co-authored-by: Hlib Kanunnikov <hlibwondertan@gmail.com>
  • Loading branch information
2 people authored and sebasti810 committed Aug 14, 2024
1 parent c1b6d74 commit dbaa159
Show file tree
Hide file tree
Showing 7 changed files with 408 additions and 28 deletions.
108 changes: 105 additions & 3 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,119 @@ type Submitter interface {
}

type Service struct {
// ctx represents the Service's lifecycle context.
ctx context.Context
cancel context.CancelFunc
// accessor dials the given celestia-core endpoint to submit blobs.
blobSubmitter Submitter
// shareGetter retrieves the EDS to fetch all shares from the requested header.
shareGetter share.Getter
// headerGetter fetches header by the provided height
headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error)
// headerSub subscribes to new headers to supply to blob subscriptions.
headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error)
}

func NewService(
submitter Submitter,
getter share.Getter,
headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error),
headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error),
) *Service {
return &Service{
blobSubmitter: submitter,
shareGetter: getter,
headerGetter: headerGetter,
headerSub: headerSub,
}
}

func (s *Service) Start(context.Context) error {
s.ctx, s.cancel = context.WithCancel(context.Background())
return nil
}

func (s *Service) Stop(context.Context) error {
s.cancel()
return nil
}

// SubscriptionResponse is the response type for the Subscribe method.
// It contains the blobs and the height at which they were included.
// If the Blobs slice is empty, it means that no blobs were included at the given height.
type SubscriptionResponse struct {
Blobs []*Blob
Height uint64
}

// Subscribe returns a channel that will receive SubscriptionResponse objects.
// The channel will be closed when the context is canceled or the service is stopped.
// Please note that no errors are returned: underlying operations are retried until successful.
// Additionally, not reading from the returned channel will cause the stream to close after 16 messages.
func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *SubscriptionResponse, error) {
if s.ctx == nil {
return nil, fmt.Errorf("service has not been started")
}

headerCh, err := s.headerSub(ctx)
if err != nil {
return nil, err
}

blobCh := make(chan *SubscriptionResponse, 16)
go func() {
defer close(blobCh)

for {
select {
case header, ok := <-headerCh:
if ctx.Err() != nil {
log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID())
return
}
if !ok {
log.Errorw("header channel closed for subscription", "namespace", ns.ID())
return
}
// close subscription before buffer overflows
if len(blobCh) == cap(blobCh) {
log.Debugw("blobsub: canceling subscription due to buffer overflow from slow reader", "namespace", ns.ID())
return
}

var blobs []*Blob
var err error
for {
blobs, err = s.getAll(ctx, header, []share.Namespace{ns})
if ctx.Err() != nil {
// context canceled, continuing would lead to unexpected missed heights for the client
log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID())
return
}
if err == nil {
// operation successful, break the loop
break
}
}

select {
case <-ctx.Done():
log.Debugw("blobsub: pending response canceled due to user ctx closing", "namespace", ns.ID())
return
case blobCh <- &SubscriptionResponse{Blobs: blobs, Height: header.Height()}:
}
case <-ctx.Done():
log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID())
return
case <-s.ctx.Done():
log.Debugw("blobsub: canceling subscription due to service ctx closing", "namespace", ns.ID())
return
}
}
}()
return blobCh, nil
}

// Submit sends PFB transaction and reports the height at which it was included.
// Allows sending multiple Blobs atomically synchronously.
// Uses default wallet registered on the Node.
Expand Down Expand Up @@ -148,8 +241,8 @@ func (s *Service) GetProof(
// If all blobs were found without any errors, the user will receive a list of blobs.
// If the BlobService couldn't find any blobs under the requested namespaces,
// the user will receive an empty list of blobs along with an empty error.
// If some of the requested namespaces were not found, the user will receive all the found blobs and an empty error.
// If there were internal errors during some of the requests,
// If some of the requested namespaces were not found, the user will receive all the found blobs
// and an empty error. If there were internal errors during some of the requests,
// the user will receive all found blobs along with a combined error message.
//
// All blobs will preserve the order of the namespaces that were requested.
Expand All @@ -159,6 +252,15 @@ func (s *Service) GetAll(ctx context.Context, height uint64, namespaces []share.
return nil, err
}

return s.getAll(ctx, header, namespaces)
}

func (s *Service) getAll(
ctx context.Context,
header *header.ExtendedHeader,
namespaces []share.Namespace,
) ([]*Blob, error) {
height := header.Height()
var (
resultBlobs = make([][]*Blob, len(namespaces))
resultErr = make([]error, len(namespaces))
Expand All @@ -184,7 +286,7 @@ func (s *Service) GetAll(ctx context.Context, height uint64, namespaces []share.
wg.Wait()

blobs := slices.Concat(resultBlobs...)
err = errors.Join(resultErr...)
err := errors.Join(resultErr...)
return blobs, err
}

Expand Down
Loading

0 comments on commit dbaa159

Please sign in to comment.