Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blob): blobsub #3539

Merged
merged 18 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 105 additions & 3 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,119 @@ type Submitter interface {
}

type Service struct {
// ctx represents the Service's lifecycle context.
ctx context.Context
cancel context.CancelFunc
// accessor dials the given celestia-core endpoint to submit blobs.
blobSubmitter Submitter
// shareGetter retrieves the EDS to fetch all shares from the requested header.
shareGetter share.Getter
// headerGetter fetches header by the provided height
headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error)
// headerSub subscribes to new headers to supply to blob subscriptions.
headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error)
}

func NewService(
submitter Submitter,
getter share.Getter,
headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error),
headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error),
) *Service {
return &Service{
blobSubmitter: submitter,
shareGetter: getter,
headerGetter: headerGetter,
headerSub: headerSub,
}
}

func (s *Service) Start(context.Context) error {
s.ctx, s.cancel = context.WithCancel(context.Background())
return nil
}

func (s *Service) Stop(context.Context) error {
s.cancel()
return nil
}
walldiss marked this conversation as resolved.
Show resolved Hide resolved

// SubscriptionResponse is the response type for the Subscribe method.
// It contains the blobs and the height at which they were included.
// If the Blobs slice is empty, it means that no blobs were included at the given height.
type SubscriptionResponse struct {
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
Blobs []*Blob
Height uint64
}

// Subscribe returns a channel that will receive SubscriptionResponse objects.
// The channel will be closed when the context is canceled or the service is stopped.
// Please note that no errors are returned: underlying operations are retried until successful.
// Additionally, not reading from the returned channel will cause the stream to close after 16 messages.
func (s *Service) Subscribe(ctx context.Context, ns share.Namespace) (<-chan *SubscriptionResponse, error) {
if s.ctx == nil {
return nil, fmt.Errorf("service has not been started")
}
walldiss marked this conversation as resolved.
Show resolved Hide resolved

headerCh, err := s.headerSub(ctx)
if err != nil {
return nil, err
}

blobCh := make(chan *SubscriptionResponse, 16)
go func() {
defer close(blobCh)

for {
select {
case header, ok := <-headerCh:
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
if ctx.Err() != nil {
log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID())
return
}
if !ok {
log.Errorw("header channel closed for subscription", "namespace", ns.ID())
return
}
walldiss marked this conversation as resolved.
Show resolved Hide resolved
// close subscription before buffer overflows
if len(blobCh) == cap(blobCh) {
log.Debugw("blobsub: canceling subscription due to buffer overflow from slow reader", "namespace", ns.ID())
return
}

var blobs []*Blob
var err error
for {
blobs, err = s.getAll(ctx, header, []share.Namespace{ns})
if ctx.Err() != nil {
// context canceled, continuing would lead to unexpected missed heights for the client
log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID())
return
}
if err == nil {
// operation successful, break the loop
break
}
}

select {
case <-ctx.Done():
log.Debugw("blobsub: pending response canceled due to user ctx closing", "namespace", ns.ID())
return
case blobCh <- &SubscriptionResponse{Blobs: blobs, Height: header.Height()}:
}
case <-ctx.Done():
log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID())
return
case <-s.ctx.Done():
log.Debugw("blobsub: canceling subscription due to service ctx closing", "namespace", ns.ID())
return
}
}
}()
return blobCh, nil
}

// Submit sends PFB transaction and reports the height at which it was included.
// Allows sending multiple Blobs atomically synchronously.
// Uses default wallet registered on the Node.
Expand Down Expand Up @@ -148,8 +241,8 @@ func (s *Service) GetProof(
// If all blobs were found without any errors, the user will receive a list of blobs.
// If the BlobService couldn't find any blobs under the requested namespaces,
// the user will receive an empty list of blobs along with an empty error.
// If some of the requested namespaces were not found, the user will receive all the found blobs and an empty error.
// If there were internal errors during some of the requests,
// If some of the requested namespaces were not found, the user will receive all the found blobs
// and an empty error. If there were internal errors during some of the requests,
// the user will receive all found blobs along with a combined error message.
//
// All blobs will preserve the order of the namespaces that were requested.
Expand All @@ -159,6 +252,15 @@ func (s *Service) GetAll(ctx context.Context, height uint64, namespaces []share.
return nil, err
}

return s.getAll(ctx, header, namespaces)
}

func (s *Service) getAll(
ctx context.Context,
header *header.ExtendedHeader,
namespaces []share.Namespace,
) ([]*Blob, error) {
height := header.Height()
var (
resultBlobs = make([][]*Blob, len(namespaces))
resultErr = make([]error, len(namespaces))
Expand All @@ -184,7 +286,7 @@ func (s *Service) GetAll(ctx context.Context, height uint64, namespaces []share.
wg.Wait()

blobs := slices.Concat(resultBlobs...)
err = errors.Join(resultErr...)
err := errors.Join(resultErr...)
return blobs, err
}

Expand Down
Loading
Loading