diff --git a/simple/provider.go b/simple/provider.go index 8fa9c10..3b4c882 100644 --- a/simple/provider.go +++ b/simple/provider.go @@ -7,17 +7,16 @@ import ( "context" "time" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" q "github.com/ipfs/go-ipfs-provider/queue" logging "github.com/ipfs/go-log" - routing "github.com/libp2p/go-libp2p-core/routing" + "github.com/libp2p/go-libp2p-core/routing" ) var logP = logging.Logger("provider.simple") const ( provideOutgoingWorkerLimit = 8 - provideTimeout = 3 * time.Minute ) // Provider announces blocks to the network @@ -27,15 +26,31 @@ type Provider struct { queue *q.Queue // used to announce providing to the network contentRouting routing.ContentRouting + // how long to wait for announce to complete before giving up + timeout time.Duration +} + +type Option func(*Provider) + +func WithTimeout(timeout time.Duration) Option { + return func(p *Provider) { + p.timeout = timeout + } } // NewProvider creates a provider that announces blocks to the network using a content router -func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting) *Provider { - return &Provider{ +func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting, options ...Option) *Provider { + p := &Provider{ ctx: ctx, queue: queue, contentRouting: contentRouting, } + + for _, option := range options { + option(p) + } + + return p } // Close stops the provider @@ -64,7 +79,7 @@ func (p *Provider) handleAnnouncements() { case <-p.ctx.Done(): return case c := <-p.queue.Dequeue(): - ctx, cancel := context.WithTimeout(p.ctx, provideTimeout) + ctx, cancel := context.WithTimeout(p.ctx, p.timeout) defer cancel() logP.Info("announce - start - ", c) if err := p.contentRouting.Provide(ctx, c, true); err != nil {