diff --git a/das/daser.go b/das/daser.go index 857dc530e6..b4ba434d24 100644 --- a/das/daser.go +++ b/das/daser.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "sync/atomic" - "time" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" @@ -14,6 +13,7 @@ import ( libhead "github.com/celestiaorg/go-header" "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" @@ -162,7 +162,7 @@ func (d *DASer) Stop(ctx context.Context) error { func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error { // short-circuit if pruning is enabled and the header is outside the // availability window - if !d.isWithinSamplingWindow(h) { + if !pruner.IsWithinAvailabilityWindow(h.Time(), d.params.samplingWindow) { log.Debugw("skipping header outside sampling window", "height", h.Height(), "time", h.Time()) return errOutsideSamplingWindow @@ -183,14 +183,6 @@ func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error { return nil } -func (d *DASer) isWithinSamplingWindow(eh *header.ExtendedHeader) bool { - // if sampling window is not set, then all headers are within the window - if d.params.samplingWindow.Duration() == 0 { - return true - } - return time.Since(eh.Time()) <= d.params.samplingWindow.Duration() -} - // SamplingStats returns the current statistics over the DA sampling process. func (d *DASer) SamplingStats(ctx context.Context) (SamplingStats, error) { return d.sampler.stats(ctx) diff --git a/das/daser_test.go b/das/daser_test.go index bce4a98252..04517125fb 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -280,7 +280,11 @@ func TestDASer_SamplingWindow(t *testing.T) { eh := headertest.RandExtendedHeader(t) eh.RawHeader.Time = tt.timestamp - assert.Equal(t, tt.withinWindow, daser.isWithinSamplingWindow(eh)) + assert.Equal( + t, + tt.withinWindow, + pruner.IsWithinAvailabilityWindow(eh.Time(), daser.params.samplingWindow), + ) }) } } diff --git a/nodebuilder/node.go b/nodebuilder/node.go index b16a376cc1..9ec1c4d4e0 100644 --- a/nodebuilder/node.go +++ b/nodebuilder/node.go @@ -30,6 +30,7 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/p2p" "github.com/celestiaorg/celestia-node/nodebuilder/share" "github.com/celestiaorg/celestia-node/nodebuilder/state" + "github.com/celestiaorg/celestia-node/share/eds" ) var ( @@ -56,6 +57,9 @@ type Node struct { RPCServer *rpc.Server // not optional GatewayServer *gateway.Server `optional:"true"` + // block store + EDSStore *eds.Store `optional:"true"` + // p2p components Host host.Host ConnGater *conngater.BasicConnectionGater diff --git a/nodebuilder/pruner/module.go b/nodebuilder/pruner/module.go index 76d0ebb430..eb97e6ed17 100644 --- a/nodebuilder/pruner/module.go +++ b/nodebuilder/pruner/module.go @@ -15,31 +15,13 @@ import ( ) func ConstructModule(tp node.Type, cfg *Config) fx.Option { - if !cfg.EnableService { - switch tp { - case node.Light: - // light nodes are still subject to sampling within window - // even if pruning is not enabled. - return fx.Options( - fx.Supply(light.Window), - ) - case node.Full: - return fx.Options( - fx.Supply(archival.Window), - ) - case node.Bridge: - return fx.Options( - fx.Supply(archival.Window), - fx.Provide(func() []core.Option { - return []core.Option{} - }), - ) - default: - panic("unknown node type") - } - } - baseComponents := fx.Options( + fx.Supply(cfg), + availWindow(tp, cfg.EnableService), + ) + + fullAndBridgeComponents := fx.Options( + baseComponents, fx.Provide(fx.Annotate( newPrunerService, fx.OnStart(func(ctx context.Context, p *pruner.Service) error { @@ -52,34 +34,61 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { // This is necessary to invoke the pruner service as independent thanks to a // quirk in FX. fx.Invoke(func(_ *pruner.Service) {}), + fx.Provide(func(store *eds.Store) pruner.Pruner { + return full.NewPruner(store) + }), ) switch tp { - case node.Full: + // TODO: Eventually, light nodes will be capable of pruning samples + // in which case, this can be enabled. + case node.Light: return fx.Module("prune", baseComponents, - fx.Provide(func(store *eds.Store) pruner.Pruner { - return full.NewPruner(store) - }), - fx.Supply(full.Window), + ) + case node.Full: + opts := baseComponents + if cfg.EnableService { + opts = fullAndBridgeComponents + } + return fx.Module("prune", + opts, ) case node.Bridge: + if cfg.EnableService { + return fx.Module("prune", + fullAndBridgeComponents, + fx.Provide(func(window pruner.AvailabilityWindow) []core.Option { + return []core.Option{core.WithAvailabilityWindow(window)} + }), + ) + } return fx.Module("prune", baseComponents, - fx.Provide(func(store *eds.Store) pruner.Pruner { - return full.NewPruner(store) - }), - fx.Supply(full.Window), - fx.Provide(func(window pruner.AvailabilityWindow) []core.Option { - return []core.Option{core.WithAvailabilityWindow(window)} + fx.Provide(func() []core.Option { + return []core.Option{} }), ) - // TODO: Eventually, light nodes will be capable of pruning samples - // in which case, this can be enabled. + default: + panic("unknown node type") + } +} + +func availWindow(tp node.Type, pruneEnabled bool) fx.Option { + switch tp { case node.Light: - return fx.Module("prune", - fx.Supply(light.Window), - ) + // light nodes are still subject to sampling within window + // even if pruning is not enabled. + return fx.Provide(func() pruner.AvailabilityWindow { + return light.Window + }) + case node.Full, node.Bridge: + return fx.Provide(func() pruner.AvailabilityWindow { + if pruneEnabled { + return full.Window + } + return archival.Window + }) default: panic("unknown node type") } diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index 96be2b5d20..12c6b9c628 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -6,9 +6,6 @@ import ( "github.com/filecoin-project/dagstore" "github.com/ipfs/boxo/blockservice" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/routing" - routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/celestiaorg/celestia-app/pkg/da" @@ -16,32 +13,8 @@ import ( "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/getters" "github.com/celestiaorg/celestia-node/share/ipld" - disc "github.com/celestiaorg/celestia-node/share/p2p/discovery" - "github.com/celestiaorg/celestia-node/share/p2p/peers" ) -const ( - // fullNodesTag is the tag used to identify full nodes in the discovery service. - fullNodesTag = "full" -) - -func newDiscovery(cfg *disc.Parameters, -) func(routing.ContentRouting, host.Host, *peers.Manager) (*disc.Discovery, error) { - return func( - r routing.ContentRouting, - h host.Host, - manager *peers.Manager, - ) (*disc.Discovery, error) { - return disc.NewDiscovery( - cfg, - h, - routingdisc.NewRoutingDiscovery(r), - fullNodesTag, - disc.WithOnPeersUpdate(manager.UpdateNodePool), - ) - } -} - func newShareModule(getter share.Getter, avail share.Availability) Module { return &module{getter, avail} } diff --git a/nodebuilder/share/module.go b/nodebuilder/share/module.go index b81149535f..06dd5c1d2b 100644 --- a/nodebuilder/share/module.go +++ b/nodebuilder/share/module.go @@ -5,21 +5,16 @@ import ( "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/p2p/net/conngater" "go.uber.org/fx" - libhead "github.com/celestiaorg/go-header" - "github.com/celestiaorg/go-header/sync" - - "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/nodebuilder/node" modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/availability/full" "github.com/celestiaorg/celestia-node/share/availability/light" "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/getters" - disc "github.com/celestiaorg/celestia-node/share/p2p/discovery" "github.com/celestiaorg/celestia-node/share/p2p/peers" "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" @@ -35,20 +30,9 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option fx.Error(cfgErr), fx.Options(options...), fx.Provide(newShareModule), - peerManagerComponents(tp, cfg), - discoveryComponents(cfg), - shrexSubComponents(), - ) - - bridgeAndFullComponents := fx.Options( - fx.Provide(getters.NewStoreGetter), - shrexServerComponents(cfg), - edsStoreComponents(cfg), - fullAvailabilityComponents(), - shrexGetterComponents(cfg), - fx.Provide(func(shrexSub *shrexsub.PubSub) shrexsub.BroadcastFn { - return shrexSub.Broadcast - }), + availabilityComponents(tp, cfg), + shrexComponents(tp, cfg), + peerComponents(tp, cfg), ) switch tp { @@ -56,24 +40,14 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option return fx.Module( "share", baseComponents, - bridgeAndFullComponents, - fx.Provide(func() peers.Parameters { - return cfg.PeerManagerParams - }), + edsStoreComponents(cfg), fx.Provide(bridgeGetter), - fx.Invoke(func(lc fx.Lifecycle, sub *shrexsub.PubSub) error { - lc.Append(fx.Hook{ - OnStart: sub.Start, - OnStop: sub.Stop, - }) - return nil - }), ) case node.Full: return fx.Module( "share", baseComponents, - bridgeAndFullComponents, + edsStoreComponents(cfg), fx.Provide(getters.NewIPLDGetter), fx.Provide(fullGetter), ) @@ -81,84 +55,21 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option return fx.Module( "share", baseComponents, - shrexGetterComponents(cfg), - lightAvailabilityComponents(cfg), fx.Invoke(ensureEmptyEDSInBS), fx.Provide(getters.NewIPLDGetter), fx.Provide(lightGetter), - // shrexsub broadcaster stub for daser - fx.Provide(func() shrexsub.BroadcastFn { - return func(context.Context, shrexsub.Notification) error { - return nil - } - }), ) default: panic("invalid node type") } } -func discoveryComponents(cfg *Config) fx.Option { - return fx.Options( - fx.Invoke(func(_ *disc.Discovery) {}), - fx.Provide(fx.Annotate( - newDiscovery(cfg.Discovery), - fx.OnStart(func(ctx context.Context, d *disc.Discovery) error { - return d.Start(ctx) - }), - fx.OnStop(func(ctx context.Context, d *disc.Discovery) error { - return d.Stop(ctx) - }), - )), - ) -} - -func peerManagerComponents(tp node.Type, cfg *Config) fx.Option { - switch tp { - case node.Full, node.Light: - return fx.Options( - fx.Provide(func() peers.Parameters { - return cfg.PeerManagerParams +func shrexComponents(tp node.Type, cfg *Config) fx.Option { + opts := fx.Options( + fx.Provide( + func(ctx context.Context, h host.Host, network modp2p.Network) (*shrexsub.PubSub, error) { + return shrexsub.NewPubSub(ctx, h, network.String()) }), - fx.Provide( - func( - params peers.Parameters, - host host.Host, - connGater *conngater.BasicConnectionGater, - shrexSub *shrexsub.PubSub, - headerSub libhead.Subscriber[*header.ExtendedHeader], - // we must ensure Syncer is started before PeerManager - // so that Syncer registers header validator before PeerManager subscribes to headers - _ *sync.Syncer[*header.ExtendedHeader], - ) (*peers.Manager, error) { - return peers.NewManager( - params, - host, - connGater, - peers.WithShrexSubPools(shrexSub, headerSub), - ) - }, - ), - ) - case node.Bridge: - return fx.Provide(peers.NewManager) - default: - panic("invalid node type") - } -} - -func shrexSubComponents() fx.Option { - return fx.Provide( - func(ctx context.Context, h host.Host, network modp2p.Network) (*shrexsub.PubSub, error) { - return shrexsub.NewPubSub(ctx, h, network.String()) - }, - ) -} - -// shrexGetterComponents provides components for a shrex getter that -// is capable of requesting -func shrexGetterComponents(cfg *Config) fx.Option { - return fx.Options( // shrex-nd client fx.Provide( func(host host.Host, network modp2p.Network) (*shrexnd.Client, error) { @@ -175,8 +86,25 @@ func shrexGetterComponents(cfg *Config) fx.Option { }, ), + // shrex-getter fx.Provide(fx.Annotate( - getters.NewShrexGetter, + func( + edsClient *shrexeds.Client, + ndClient *shrexnd.Client, + managers map[string]*peers.Manager, + window pruner.AvailabilityWindow, + ) *getters.ShrexGetter { + return getters.NewShrexGetter( + edsClient, + ndClient, + managers[fullNodesTag], + managers[archivalNodesTag], + // TODO @renaynay: Pruned FNs should pass `light.Window` to shrex getter + // best route requests (as full.Window serves as a buffer for serving data while + // the request router itself should just stick to light.Window) + window, + ) + }, fx.OnStart(func(ctx context.Context, getter *getters.ShrexGetter) error { return getter.Start(ctx) }), @@ -185,6 +113,46 @@ func shrexGetterComponents(cfg *Config) fx.Option { }), )), ) + + switch tp { + case node.Light: + return fx.Options( + opts, + // shrexsub broadcaster stub for daser + fx.Provide(func() shrexsub.BroadcastFn { + return func(context.Context, shrexsub.Notification) error { + return nil + } + }), + ) + case node.Full: + return fx.Options( + opts, + shrexServerComponents(cfg), + fx.Provide(getters.NewStoreGetter), + fx.Provide(func(shrexSub *shrexsub.PubSub) shrexsub.BroadcastFn { + return shrexSub.Broadcast + }), + ) + case node.Bridge: + return fx.Options( + opts, + shrexServerComponents(cfg), + fx.Provide(getters.NewStoreGetter), + fx.Provide(func(shrexSub *shrexsub.PubSub) shrexsub.BroadcastFn { + return shrexSub.Broadcast + }), + fx.Invoke(func(lc fx.Lifecycle, sub *shrexsub.PubSub) error { + lc.Append(fx.Hook{ + OnStart: sub.Start, + OnStop: sub.Stop, + }) + return nil + }), + ) + default: + panic("invalid node type") + } } func shrexServerComponents(cfg *Config) fx.Option { @@ -241,38 +209,34 @@ func edsStoreComponents(cfg *Config) fx.Option { ) } -func fullAvailabilityComponents() fx.Option { - return fx.Options( - fx.Provide(fx.Annotate( - full.NewShareAvailability, - fx.OnStart(func(ctx context.Context, avail *full.ShareAvailability) error { - return avail.Start(ctx) - }), - fx.OnStop(func(ctx context.Context, avail *full.ShareAvailability) error { - return avail.Stop(ctx) +func availabilityComponents(tp node.Type, cfg *Config) fx.Option { + switch tp { + case node.Light: + return fx.Options( + fx.Provide(fx.Annotate( + func(getter share.Getter, ds datastore.Batching) *light.ShareAvailability { + return light.NewShareAvailability( + getter, + ds, + light.WithSampleAmount(cfg.LightAvailability.SampleAmount), + ) + }, + fx.OnStop(func(ctx context.Context, la *light.ShareAvailability) error { + return la.Close(ctx) + }), + )), + fx.Provide(func(avail *light.ShareAvailability) share.Availability { + return avail }), - )), - fx.Provide(func(avail *full.ShareAvailability) share.Availability { - return avail - }), - ) -} - -func lightAvailabilityComponents(cfg *Config) fx.Option { - return fx.Options( - fx.Provide(fx.Annotate( - light.NewShareAvailability, - fx.OnStop(func(ctx context.Context, la *light.ShareAvailability) error { - return la.Close(ctx) + ) + case node.Bridge, node.Full: + return fx.Options( + fx.Provide(full.NewShareAvailability), + fx.Provide(func(avail *full.ShareAvailability) share.Availability { + return avail }), - )), - fx.Provide(func() []light.Option { - return []light.Option{ - light.WithSampleAmount(cfg.LightAvailability.SampleAmount), - } - }), - fx.Provide(func(avail *light.ShareAvailability) share.Availability { - return avail - }), - ) + ) + default: + panic("invalid node type") + } } diff --git a/nodebuilder/share/opts.go b/nodebuilder/share/opts.go index e236847f41..9c122b7b0f 100644 --- a/nodebuilder/share/opts.go +++ b/nodebuilder/share/opts.go @@ -1,6 +1,8 @@ package share import ( + "errors" + "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/getters" disc "github.com/celestiaorg/celestia-node/share/p2p/discovery" @@ -11,14 +13,22 @@ import ( // WithPeerManagerMetrics is a utility function to turn on peer manager metrics and that is // expected to be "invoked" by the fx lifecycle. -func WithPeerManagerMetrics(m *peers.Manager) error { - return m.WithMetrics() +func WithPeerManagerMetrics(managers map[string]*peers.Manager) error { + var err error + for _, m := range managers { + err = errors.Join(err, m.WithMetrics()) + } + return err } // WithDiscoveryMetrics is a utility function to turn on discovery metrics and that is expected to // be "invoked" by the fx lifecycle. -func WithDiscoveryMetrics(d *disc.Discovery) error { - return d.WithMetrics() +func WithDiscoveryMetrics(discs []*disc.Discovery) error { + var err error + for _, disc := range discs { + err = errors.Join(err, disc.WithMetrics()) + } + return err } func WithShrexClientMetrics(edsClient *shrexeds.Client, ndClient *shrexnd.Client) error { diff --git a/nodebuilder/share/p2p_constructors.go b/nodebuilder/share/p2p_constructors.go new file mode 100644 index 0000000000..aae1a325aa --- /dev/null +++ b/nodebuilder/share/p2p_constructors.go @@ -0,0 +1,143 @@ +package share + +import ( + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/routing" + routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing" + "github.com/libp2p/go-libp2p/p2p/net/conngater" + "go.uber.org/fx" + + libhead "github.com/celestiaorg/go-header" + "github.com/celestiaorg/go-header/sync" + + "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/nodebuilder/node" + modprune "github.com/celestiaorg/celestia-node/nodebuilder/pruner" + disc "github.com/celestiaorg/celestia-node/share/p2p/discovery" + "github.com/celestiaorg/celestia-node/share/p2p/peers" + "github.com/celestiaorg/celestia-node/share/p2p/shrexsub" +) + +const ( + // fullNodesTag is the tag used to identify full nodes in the discovery service. + fullNodesTag = "full" + // archivalNodesTag is the tag used to identify archival nodes in the + // discovery service. + archivalNodesTag = "archival" +) + +// TODO @renaynay: rename +func peerComponents(tp node.Type, cfg *Config) fx.Option { + return fx.Options( + fullDiscoveryAndPeerManager(tp, cfg), + archivalDiscoveryAndPeerManager(tp, cfg), + ) +} + +// fullDiscoveryAndPeerManager builds the discovery instance and peer manager +// for the `full` tag. Every node type (Light, Full, and Bridge) must discovery +// `full` nodes on the network. +func fullDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option { + return fx.Provide( + func( + lc fx.Lifecycle, + host host.Host, + r routing.ContentRouting, + connGater *conngater.BasicConnectionGater, + shrexSub *shrexsub.PubSub, + headerSub libhead.Subscriber[*header.ExtendedHeader], + // we must ensure Syncer is started before PeerManager + // so that Syncer registers header validator before PeerManager subscribes to headers + _ *sync.Syncer[*header.ExtendedHeader], + ) (*peers.Manager, *disc.Discovery, error) { + var managerOpts []peers.Option + if tp != node.Bridge { + // BNs do not need the overhead of shrexsub peer pools as + // BNs do not sync blocks off the DA network. + managerOpts = append(managerOpts, peers.WithShrexSubPools(shrexSub, headerSub)) + } + + fullManager, err := peers.NewManager( + cfg.PeerManagerParams, + host, + connGater, + fullNodesTag, + managerOpts..., + ) + if err != nil { + return nil, nil, err + } + + discOpts := []disc.Option{disc.WithOnPeersUpdate(fullManager.UpdateNodePool)} + + if tp != node.Light { + // only FN and BNs should advertise to `full` topic + discOpts = append(discOpts, disc.WithAdvertise()) + } + + fullDisc, err := disc.NewDiscovery( + cfg.Discovery, + host, + routingdisc.NewRoutingDiscovery(r), + fullNodesTag, + discOpts..., + ) + if err != nil { + return nil, nil, err + } + lc.Append(fx.Hook{ + OnStart: fullDisc.Start, + OnStop: fullDisc.Stop, + }) + + return fullManager, fullDisc, nil + }) +} + +// archivalDiscoveryAndPeerManager TODO @renaynay +func archivalDiscoveryAndPeerManager(tp node.Type, cfg *Config) fx.Option { + return fx.Provide( + func( + lc fx.Lifecycle, + pruneCfg *modprune.Config, + d *disc.Discovery, + manager *peers.Manager, + h host.Host, + r routing.ContentRouting, + gater *conngater.BasicConnectionGater, + ) (map[string]*peers.Manager, []*disc.Discovery, error) { + archivalPeerManager, err := peers.NewManager( + cfg.PeerManagerParams, + h, + gater, + archivalNodesTag, + ) + if err != nil { + return nil, nil, err + } + + discOpts := []disc.Option{disc.WithOnPeersUpdate(archivalPeerManager.UpdateNodePool)} + + if (tp == node.Bridge || tp == node.Full) && !pruneCfg.EnableService { + discOpts = append(discOpts, disc.WithAdvertise()) + } + + archivalDisc, err := disc.NewDiscovery( + cfg.Discovery, + h, + routingdisc.NewRoutingDiscovery(r), + archivalNodesTag, + discOpts..., + ) + if err != nil { + return nil, nil, err + } + lc.Append(fx.Hook{ + OnStart: archivalDisc.Start, + OnStop: archivalDisc.Stop, + }) + + managers := map[string]*peers.Manager{fullNodesTag: manager, archivalNodesTag: archivalPeerManager} + return managers, []*disc.Discovery{d, archivalDisc}, nil + }) +} diff --git a/nodebuilder/tests/prune_test.go b/nodebuilder/tests/prune_test.go new file mode 100644 index 0000000000..8c530cef02 --- /dev/null +++ b/nodebuilder/tests/prune_test.go @@ -0,0 +1,152 @@ +package tests + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/fx" + + "github.com/celestiaorg/celestia-node/blob" + "github.com/celestiaorg/celestia-node/nodebuilder" + "github.com/celestiaorg/celestia-node/nodebuilder/das" + "github.com/celestiaorg/celestia-node/nodebuilder/node" + "github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp" + "github.com/celestiaorg/celestia-node/pruner" + "github.com/celestiaorg/celestia-node/share" +) + +// TestArchivalBlobSync tests whether a LN is able to sync historical blobs from +// an archival node in a network dominated by pruned nodes. +// +// 1 BN w/ pruning, 3 FN w/ pruning, 1 FN archival + +// turn on archival BN +// archival FN syncs against BN +// turn off archival BN +// turn on pruning BN +// spin up 3 pruning FNs, connect +// spin up 1 LN that syncs historic blobs +func TestArchivalBlobSync(t *testing.T) { + const ( + blocks = 50 + btime = time.Millisecond * 300 + bsize = 16 + ) + + ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout) + t.Cleanup(cancel) + + sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime)) + fillDn := swamp.FillBlocks(ctx, sw.ClientContext, sw.Accounts, bsize, blocks) + + archivalBN := sw.NewBridgeNode() + sw.SetBootstrapper(t, archivalBN) + + err := archivalBN.Start(ctx) + require.NoError(t, err) + + archivalFN := sw.NewFullNode() + err = archivalFN.Start(ctx) + require.NoError(t, err) + + require.NoError(t, <-fillDn) + + pruningCfg := nodebuilder.DefaultConfig(node.Bridge) + pruningCfg.Pruner.EnableService = true + + testAvailWindow := pruner.AvailabilityWindow(time.Millisecond) + prunerOpts := fx.Options( + fx.Replace(testAvailWindow), + ) + + // stop the archival BN to force LN to have to discover + // the archival FN later + err = archivalBN.Stop(ctx) + require.NoError(t, err) + + pruningBN := sw.NewNodeWithConfig(node.Bridge, pruningCfg, prunerOpts) + sw.SetBootstrapper(t, pruningBN) + err = pruningBN.Start(ctx) + require.NoError(t, err) + + err = archivalFN.Host.Connect(ctx, *host.InfoFromHost(pruningBN.Host)) + require.NoError(t, err) + + pruningCfg.DASer = das.DefaultConfig(node.Full) + pruningCfg.Pruner.EnableService = true + pruningFulls := make([]*nodebuilder.Node, 0, 3) + for i := 0; i < 3; i++ { + pruningFN := sw.NewNodeWithConfig(node.Full, pruningCfg, prunerOpts) + err = pruningFN.Start(ctx) + require.NoError(t, err) + + pruningFulls = append(pruningFulls, pruningFN) + } + + type archivalBlob struct { + blob *blob.Blob + height uint64 + root share.DataHash + } + + archivalBlobs := make([]*archivalBlob, 0) + i := 1 + for { + eh, err := archivalFN.HeaderServ.GetByHeight(ctx, uint64(i)) + require.NoError(t, err) + + if bytes.Equal(eh.DataHash, share.EmptyRoot().Hash()) { + i++ + continue + } + + shr, err := archivalFN.ShareServ.GetShare(ctx, eh, 2, 2) + require.NoError(t, err) + ns, err := share.NamespaceFromBytes(shr[:share.NamespaceSize]) + require.NoError(t, err) + + blobs, err := archivalFN.BlobServ.GetAll(ctx, uint64(i), []share.Namespace{ns}) + require.NoError(t, err) + + archivalBlobs = append(archivalBlobs, &archivalBlob{ + blob: blobs[0], + height: uint64(i), + root: eh.DAH.Hash(), + }) + + if len(archivalBlobs) > 10 { + break + } + i++ + } + + // ensure pruned FNs don't have the blocks associated + // with the historical blobs + for _, pruned := range pruningFulls { + for _, b := range archivalBlobs { + has, err := pruned.EDSStore.Has(ctx, b.root) + require.NoError(t, err) + assert.False(t, has) + } + } + + ln := sw.NewLightNode(prunerOpts) + err = ln.Start(ctx) + require.NoError(t, err) + + // ensure LN can retrieve all archival blobs from the + // archival FN + for _, b := range archivalBlobs { + _, err = ln.HeaderServ.WaitForHeight(ctx, b.height) + require.NoError(t, err) + got, err := ln.BlobServ.Get(ctx, b.height, b.blob.Namespace(), b.blob.Commitment) + require.NoError(t, err) + assert.Equal(t, b.blob.Commitment, got.Commitment) + assert.Equal(t, b.blob.Data, got.Data) + } +} diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index 4ea211cb1e..573819dc9f 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -13,7 +13,6 @@ import ( "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/share/ipld" - "github.com/celestiaorg/celestia-node/share/p2p/discovery" ) var log = logging.Logger("share/full") @@ -24,37 +23,19 @@ var log = logging.Logger("share/full") type ShareAvailability struct { store *eds.Store getter share.Getter - disc *discovery.Discovery - - cancel context.CancelFunc } // NewShareAvailability creates a new full ShareAvailability. func NewShareAvailability( store *eds.Store, getter share.Getter, - disc *discovery.Discovery, ) *ShareAvailability { return &ShareAvailability{ store: store, getter: getter, - disc: disc, } } -func (fa *ShareAvailability) Start(context.Context) error { - ctx, cancel := context.WithCancel(context.Background()) - fa.cancel = cancel - - go fa.disc.Advertise(ctx) - return nil -} - -func (fa *ShareAvailability) Stop(context.Context) error { - fa.cancel() - return nil -} - // SharesAvailable reconstructs the data committed to the given Root by requesting // enough Shares from the network. func (fa *ShareAvailability) SharesAvailable(ctx context.Context, header *header.ExtendedHeader) error { diff --git a/share/availability/full/testing.go b/share/availability/full/testing.go index 46e97581f2..7379c83441 100644 --- a/share/availability/full/testing.go +++ b/share/availability/full/testing.go @@ -6,8 +6,6 @@ import ( "time" "github.com/ipfs/go-datastore" - routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" - "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/stretchr/testify/require" "github.com/celestiaorg/celestia-node/share" @@ -44,13 +42,7 @@ func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability { params := discovery.DefaultParameters() params.AdvertiseInterval = time.Second params.PeersLimit = 10 - disc, err := discovery.NewDiscovery( - params, - nil, - routing.NewRoutingDiscovery(routinghelpers.Null{}), - "full", - ) - require.NoError(t, err) + store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore()) require.NoError(t, err) err = store.Start(context.Background()) @@ -60,5 +52,5 @@ func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability { err = store.Stop(context.Background()) require.NoError(t, err) }) - return NewShareAvailability(store, getter, disc) + return NewShareAvailability(store, getter) } diff --git a/share/getters/shrex.go b/share/getters/shrex.go index 826c6b1a10..c59f5dbf6c 100644 --- a/share/getters/shrex.go +++ b/share/getters/shrex.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + libpeer "github.com/libp2p/go-libp2p/core/peer" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -15,6 +16,7 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/libs/utils" + "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/ipld" "github.com/celestiaorg/celestia-node/share/p2p" @@ -88,7 +90,8 @@ type ShrexGetter struct { edsClient *shrexeds.Client ndClient *shrexnd.Client - peerManager *peers.Manager + fullPeerManager *peers.Manager + archivalPeerManager *peers.Manager // minRequestTimeout limits minimal timeout given to single peer by getter for serving the request. minRequestTimeout time.Duration @@ -96,25 +99,45 @@ type ShrexGetter struct { // attempt multiple peers in scope of one request before context timeout is reached minAttemptsCount int + availabilityWindow pruner.AvailabilityWindow + metrics *metrics } -func NewShrexGetter(edsClient *shrexeds.Client, ndClient *shrexnd.Client, peerManager *peers.Manager) *ShrexGetter { - return &ShrexGetter{ - edsClient: edsClient, - ndClient: ndClient, - peerManager: peerManager, - minRequestTimeout: defaultMinRequestTimeout, - minAttemptsCount: defaultMinAttemptsCount, +func NewShrexGetter( + edsClient *shrexeds.Client, + ndClient *shrexnd.Client, + fullPeerManager *peers.Manager, + archivalManager *peers.Manager, + availWindow pruner.AvailabilityWindow, +) *ShrexGetter { + s := &ShrexGetter{ + edsClient: edsClient, + ndClient: ndClient, + fullPeerManager: fullPeerManager, + archivalPeerManager: archivalManager, + minRequestTimeout: defaultMinRequestTimeout, + minAttemptsCount: defaultMinAttemptsCount, + availabilityWindow: availWindow, } + + return s } func (sg *ShrexGetter) Start(ctx context.Context) error { - return sg.peerManager.Start(ctx) + err := sg.fullPeerManager.Start(ctx) + if err != nil { + return err + } + return sg.archivalPeerManager.Start(ctx) } func (sg *ShrexGetter) Stop(ctx context.Context) error { - return sg.peerManager.Stop(ctx) + err := sg.fullPeerManager.Stop(ctx) + if err != nil { + return err + } + return sg.archivalPeerManager.Stop(ctx) } func (sg *ShrexGetter) GetShare(context.Context, *header.ExtendedHeader, int, int) (share.Share, error) { @@ -122,10 +145,7 @@ func (sg *ShrexGetter) GetShare(context.Context, *header.ExtendedHeader, int, in } func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { - var ( - attempt int - err error - ) + var err error ctx, span := tracer.Start(ctx, "shrex/get-eds") defer func() { utils.SetStatusAndEnd(span, err) @@ -135,6 +155,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader if header.DAH.Equals(share.EmptyRoot()) { return share.EmptyExtendedDataSquare(), nil } + + var attempt int for { if ctx.Err() != nil { sg.metrics.recordEDSAttempt(ctx, attempt, false) @@ -142,7 +164,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader } attempt++ start := time.Now() - peer, setStatus, getErr := sg.peerManager.Peer(ctx, header.DAH.Hash(), header.Height()) + + peer, setStatus, getErr := sg.getPeer(ctx, header) if getErr != nil { log.Debugw("eds: couldn't find peer", "hash", header.DAH.String(), @@ -218,7 +241,8 @@ func (sg *ShrexGetter) GetSharesByNamespace( } attempt++ start := time.Now() - peer, setStatus, getErr := sg.peerManager.Peer(ctx, header.DAH.Hash(), header.Height()) + + peer, setStatus, getErr := sg.getPeer(ctx, header) if getErr != nil { log.Debugw("nd: couldn't find peer", "hash", dah.String(), @@ -268,3 +292,14 @@ func (sg *ShrexGetter) GetSharesByNamespace( "finished (s)", time.Since(reqStart)) } } + +func (sg *ShrexGetter) getPeer( + ctx context.Context, + header *header.ExtendedHeader, +) (libpeer.ID, peers.DoneFunc, error) { + if !pruner.IsWithinAvailabilityWindow(header.Time(), sg.availabilityWindow) { + p, df, err := sg.archivalPeerManager.Peer(ctx, header.DAH.Hash(), header.Height()) + return p, df, err + } + return sg.fullPeerManager.Peer(ctx, header.DAH.Hash(), header.Height()) +} diff --git a/share/getters/shrex_test.go b/share/getters/shrex_test.go index 74896e6c15..e572cc48ba 100644 --- a/share/getters/shrex_test.go +++ b/share/getters/shrex_test.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/p2p/net/conngater" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" libhead "github.com/celestiaorg/go-header" @@ -20,6 +21,7 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/header/headertest" + "github.com/celestiaorg/celestia-node/pruner/full" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/eds/edstest" @@ -51,9 +53,13 @@ func TestShrexGetter(t *testing.T) { // create shrex Getter sub := new(headertest.Subscriber) - peerManager, err := testManager(ctx, clHost, sub) + + fullPeerManager, err := testManager(ctx, clHost, sub) + require.NoError(t, err) + archivalPeerManager, err := testManager(ctx, clHost, sub) require.NoError(t, err) - getter := NewShrexGetter(edsClient, ndClient, peerManager) + + getter := NewShrexGetter(edsClient, ndClient, fullPeerManager, archivalPeerManager, full.Window) require.NoError(t, getter.Start(ctx)) t.Run("ND_Available, total data size > 1mb", func(t *testing.T) { @@ -65,7 +71,7 @@ func TestShrexGetter(t *testing.T) { randEDS, dah := edstest.RandEDSWithNamespace(t, namespace, 64) eh := headertest.RandExtendedHeaderWithRoot(t, dah) require.NoError(t, edsStore.Put(ctx, dah.Hash(), randEDS)) - peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ + fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: dah.Hash(), Height: 1, }) @@ -82,7 +88,7 @@ func TestShrexGetter(t *testing.T) { // generate test data _, dah, namespace := generateTestEDS(t) eh := headertest.RandExtendedHeaderWithRoot(t, dah) - peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ + fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: dah.Hash(), Height: 1, }) @@ -99,7 +105,7 @@ func TestShrexGetter(t *testing.T) { eds, dah, maxNamespace := generateTestEDS(t) eh := headertest.RandExtendedHeaderWithRoot(t, dah) require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds)) - peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ + fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: dah.Hash(), Height: 1, }) @@ -124,7 +130,7 @@ func TestShrexGetter(t *testing.T) { eds, dah, maxNamespace := generateTestEDS(t) eh := headertest.RandExtendedHeaderWithRoot(t, dah) require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds)) - peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ + fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: dah.Hash(), Height: 1, }) @@ -149,7 +155,7 @@ func TestShrexGetter(t *testing.T) { randEDS, dah, _ := generateTestEDS(t) eh := headertest.RandExtendedHeaderWithRoot(t, dah) require.NoError(t, edsStore.Put(ctx, dah.Hash(), randEDS)) - peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ + fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: dah.Hash(), Height: 1, }) @@ -165,7 +171,7 @@ func TestShrexGetter(t *testing.T) { // generate test data _, dah, _ := generateTestEDS(t) eh := headertest.RandExtendedHeaderWithRoot(t, dah) - peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ + fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: dah.Hash(), Height: 1, }) @@ -182,7 +188,7 @@ func TestShrexGetter(t *testing.T) { // generate test data _, dah, _ := generateTestEDS(t) eh := headertest.RandExtendedHeaderWithRoot(t, dah) - peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ + fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: dah.Hash(), Height: 1, }) @@ -190,6 +196,35 @@ func TestShrexGetter(t *testing.T) { _, err := getter.GetEDS(ctx, eh) require.ErrorIs(t, err, share.ErrNotFound) }) + + // tests getPeer's ability to route requests based on whether + // they are historical or not + t.Run("routing_historical_requests", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + t.Cleanup(cancel) + + archivalPeer, err := net.GenPeer() + require.NoError(t, err) + fullPeer, err := net.GenPeer() + require.NoError(t, err) + + getter.archivalPeerManager.UpdateNodePool(archivalPeer.ID(), true) + getter.fullPeerManager.UpdateNodePool(fullPeer.ID(), true) + + eh := headertest.RandExtendedHeader(t) + + // historical data expects an archival peer + eh.RawHeader.Time = time.Now().Add(-(time.Duration(full.Window) + time.Second)) + id, _, err := getter.getPeer(ctx, eh) + require.NoError(t, err) + assert.Equal(t, archivalPeer.ID(), id) + + // recent (within sampling window) data expects a full peer + eh.RawHeader.Time = time.Now() + id, _, err = getter.getPeer(ctx, eh) + require.NoError(t, err) + assert.Equal(t, fullPeer.ID(), id) + }) } func newStore(t *testing.T) (*eds.Store, error) { @@ -223,6 +258,7 @@ func testManager( peers.DefaultParameters(), host, connGater, + "test", peers.WithShrexSubPools(shrexSub, headerSub), ) return manager, err diff --git a/share/p2p/discovery/discovery.go b/share/p2p/discovery/discovery.go index f2ca04bbbe..90aea56558 100644 --- a/share/p2p/discovery/discovery.go +++ b/share/p2p/discovery/discovery.go @@ -42,14 +42,18 @@ var discoveryRetryTimeout = retryTimeout // Discovery combines advertise and discover services and allows to store discovered nodes. // TODO: The code here gets horribly hairy, so we should refactor this at some point type Discovery struct { - // Tag is used as rondezvous point for discovery service + // Tag is used as rendezvous point for discovery service tag string set *limitedSet host host.Host disc discovery.Discovery connector *backoffConnector + // onUpdatedPeers will be called on peer set changes onUpdatedPeers OnUpdatedPeers + // indicates whether the discovery instance should also advertise + // to the topic + advertise bool triggerDisc chan struct{} @@ -92,6 +96,7 @@ func NewDiscovery( disc: d, connector: newBackoffConnector(h, defaultBackoffFactory), onUpdatedPeers: o.onUpdatedPeers, + advertise: o.advertise, params: params, triggerDisc: make(chan struct{}), }, nil @@ -109,6 +114,12 @@ func (d *Discovery) Start(context.Context) error { go d.discoveryLoop(ctx) go d.disconnectsLoop(ctx, sub) go d.connector.GC(ctx) + + if d.advertise { + log.Infow("advertising to topic", "topic", d.tag) + go d.Advertise(ctx) + } + return nil } @@ -158,6 +169,7 @@ func (d *Discovery) Advertise(ctx context.Context) { timer := time.NewTimer(d.params.AdvertiseInterval) defer timer.Stop() for { + log.Debugf("advertising to topic %s", d.tag) _, err := d.disc.Advertise(ctx, d.tag) d.metrics.observeAdvertise(ctx, err) if err != nil { @@ -182,7 +194,7 @@ func (d *Discovery) Advertise(ctx context.Context) { } } - log.Debugf("advertised") + log.Debugf("successfully advertised to topic %s", d.tag) if !timer.Stop() { <-timer.C } @@ -257,13 +269,13 @@ func (d *Discovery) discover(ctx context.Context) bool { size := d.set.Size() want := d.set.Limit() - size if want == 0 { - log.Debugw("reached soft peer limit, skipping discovery", "size", size) + log.Debugw("reached soft peer limit, skipping discovery", "topic", d.tag, "size", size) return true } // TODO @renaynay: eventually, have a mechanism to catch if wanted amount of peers // has not been discovered in X amount of time so that users are warned of degraded // FN connectivity. - log.Debugw("discovering peers", "want", want) + log.Debugw("discovering peers", "topic", d.tag, "want", want) // we use errgroup as it provide limits var wg errgroup.Group @@ -279,7 +291,7 @@ func (d *Discovery) discover(ctx context.Context) bool { peers, err := d.disc.FindPeers(findCtx, d.tag) if err != nil { - log.Error("unable to start discovery", "err", err) + log.Error("unable to start discovery", "topic", d.tag, "err", err) return false } @@ -304,12 +316,12 @@ func (d *Discovery) discover(ctx context.Context) bool { } size := d.set.Size() - log.Debugw("found peer", "peer", peer.ID.String(), "found_amount", size) + log.Debugw("found peer", "topic", d.tag, "peer", peer.ID.String(), "found_amount", size) if size < d.set.Limit() { return nil } - log.Infow("discovered wanted peers", "amount", size) + log.Infow("discovered wanted peers", "topic", d.tag, "amount", size) findCancel() // stop discovery when we are done return nil }) @@ -320,7 +332,7 @@ func (d *Discovery) discover(ctx context.Context) bool { isEnoughPeers := d.set.Size() >= d.set.Limit() d.metrics.observeFindPeers(ctx, isEnoughPeers) - log.Debugw("discovery finished", "discovered_wanted", isEnoughPeers) + log.Debugw("discovery finished", "topic", d.tag, "discovered_wanted", isEnoughPeers) return isEnoughPeers } } diff --git a/share/p2p/discovery/metrics.go b/share/p2p/discovery/metrics.go index 5847fbdd90..e069420988 100644 --- a/share/p2p/discovery/metrics.go +++ b/share/p2p/discovery/metrics.go @@ -53,43 +53,43 @@ func (d *Discovery) WithMetrics() error { } func initMetrics(d *Discovery) (*metrics, error) { - peersAmount, err := meter.Int64ObservableGauge("discovery_amount_of_peers", + peersAmount, err := meter.Int64ObservableGauge(d.tag+"_discovery_amount_of_peers", metric.WithDescription("amount of peers in discovery set")) if err != nil { return nil, err } - discoveryResult, err := meter.Int64Counter("discovery_find_peers_result", + discoveryResult, err := meter.Int64Counter(d.tag+"_discovery_find_peers_result", metric.WithDescription("result of find peers run")) if err != nil { return nil, err } - handlePeerResultCounter, err := meter.Int64Counter("discovery_handler_peer_result", + handlePeerResultCounter, err := meter.Int64Counter(d.tag+"_discovery_handler_peer_result", metric.WithDescription("result handling found peer")) if err != nil { return nil, err } - advertise, err := meter.Int64Counter("discovery_advertise_event", + advertise, err := meter.Int64Counter(d.tag+"_discovery_advertise_event", metric.WithDescription("advertise events counter")) if err != nil { return nil, err } - peerAdded, err := meter.Int64Counter("discovery_add_peer", + peerAdded, err := meter.Int64Counter(d.tag+"_discovery_add_peer", metric.WithDescription("add peer to discovery set counter")) if err != nil { return nil, err } - peerRemoved, err := meter.Int64Counter("discovery_remove_peer", + peerRemoved, err := meter.Int64Counter(d.tag+"_discovery_remove_peer", metric.WithDescription("remove peer from discovery set counter")) if err != nil { return nil, err } - backOffSize, err := meter.Int64ObservableGauge("discovery_backoff_amount", + backOffSize, err := meter.Int64ObservableGauge(d.tag+"_discovery_backoff_amount", metric.WithDescription("amount of peers in backoff")) if err != nil { return nil, err diff --git a/share/p2p/discovery/options.go b/share/p2p/discovery/options.go index de4b13a7db..8515bcbe00 100644 --- a/share/p2p/discovery/options.go +++ b/share/p2p/discovery/options.go @@ -13,7 +13,6 @@ type Parameters struct { // Set 0 to disable. PeersLimit uint // AdvertiseInterval is a interval between advertising sessions. - // Set -1 to disable. // NOTE: only full and bridge can advertise themselves. AdvertiseInterval time.Duration } @@ -22,6 +21,9 @@ type Parameters struct { type options struct { // onUpdatedPeers will be called on peer set changes onUpdatedPeers OnUpdatedPeers + // advertise indicates whether the node should also + // advertise to the discovery instance's topic + advertise bool } // Option is a function that configures Discovery Parameters @@ -55,9 +57,16 @@ func WithOnPeersUpdate(f OnUpdatedPeers) Option { } } +func WithAdvertise() Option { + return func(p *options) { + p.advertise = true + } +} + func newOptions(opts ...Option) *options { defaults := &options{ onUpdatedPeers: func(peer.ID, bool) {}, + advertise: false, } for _, opt := range opts { diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index ca85a85ea6..0d2f6ac42b 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -52,6 +52,10 @@ type Manager struct { lock sync.Mutex params Parameters + // identifies the type of peers the manager + // is managing + tag string + // header subscription is necessary in order to Validate the inbound eds hash headerSub libhead.Subscriber[*header.ExtendedHeader] shrexSub *shrexsub.PubSub @@ -100,6 +104,7 @@ func NewManager( params Parameters, host host.Host, connGater *conngater.BasicConnectionGater, + tag string, options ...Option, ) (*Manager, error) { if err := params.Validate(); err != nil { @@ -114,6 +119,7 @@ func NewManager( blacklistedHashes: make(map[string]bool), headerSubDone: make(chan struct{}), disconnectedPeersDone: make(chan struct{}), + tag: tag, } for _, opt := range options { @@ -213,7 +219,7 @@ func (m *Manager) Peer(ctx context.Context, datahash share.DataHash, height uint // obtained from discovery peerID, ok = m.nodes.tryGet() if ok { - return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.nodes.len(), 0) + return m.newPeer(ctx, datahash, peerID, sourceDiscoveredNodes, m.nodes.len(), 0) } // no peers are available right now, wait for the first one @@ -225,7 +231,7 @@ func (m *Manager) Peer(ctx context.Context, datahash share.DataHash, height uint } return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start)) case peerID = <-m.nodes.next(ctx): - return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.nodes.len(), time.Since(start)) + return m.newPeer(ctx, datahash, peerID, sourceDiscoveredNodes, m.nodes.len(), time.Since(start)) case <-ctx.Done(): return "", nil, ctx.Err() } @@ -276,7 +282,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS switch result { case ResultNoop: case ResultCooldownPeer: - if source == sourceFullNodes { + if source == sourceDiscoveredNodes { m.nodes.putOnCooldown(peerID) return } diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index 7ca6b5eb0b..2a465dc59a 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -414,6 +414,7 @@ func TestIntegration(t *testing.T) { DefaultParameters(), nil, connGater, + "test", ) require.NoError(t, err) @@ -475,6 +476,7 @@ func testManager(ctx context.Context, headerSub libhead.Subscriber[*header.Exten DefaultParameters(), host, connGater, + "test", WithShrexSubPools(shrexSub, headerSub), ) if err != nil { diff --git a/share/p2p/peers/metrics.go b/share/p2p/peers/metrics.go index da52856425..b28b263127 100644 --- a/share/p2p/peers/metrics.go +++ b/share/p2p/peers/metrics.go @@ -20,9 +20,9 @@ const ( isInstantKey = "is_instant" doneResultKey = "done_result" - sourceKey = "source" - sourceShrexSub peerSource = "shrexsub" - sourceFullNodes peerSource = "full_nodes" + sourceKey = "source" + sourceShrexSub peerSource = "shrexsub" + sourceDiscoveredNodes peerSource = "discovered_nodes" blacklistPeerReasonKey = "blacklist_reason" reasonInvalidHash blacklistPeerReason = "invalid_hash" @@ -65,7 +65,7 @@ type metrics struct { validationResult metric.Int64Counter // attributes: validation_result shrexPools metric.Int64ObservableGauge // attributes: pool_status - fullNodesPool metric.Int64ObservableGauge // attributes: pool_status + discoveredPool metric.Int64ObservableGauge // attributes: pool_status blacklistedPeersByReason sync.Map blacklistedPeers metric.Int64ObservableGauge // attributes: blacklist_reason @@ -73,49 +73,54 @@ type metrics struct { } func initMetrics(manager *Manager) (*metrics, error) { - getPeer, err := meter.Int64Counter("peer_manager_get_peer_counter", + getPeer, err := meter.Int64Counter(manager.tag+"_peer_manager_get_peer_counter", metric.WithDescription("get peer counter")) if err != nil { return nil, err } - getPeerWaitTimeHistogram, err := meter.Int64Histogram("peer_manager_get_peer_ms_time_hist", + getPeerWaitTimeHistogram, err := meter.Int64Histogram( + manager.tag+"_peer_manager_get_peer_ms_time_hist", metric.WithDescription("get peer time histogram(ms), observed only for async get(is_instant = false)")) if err != nil { return nil, err } - getPeerPoolSizeHistogram, err := meter.Int64Histogram("peer_manager_get_peer_pool_size_hist", + getPeerPoolSizeHistogram, err := meter.Int64Histogram( + manager.tag+"_peer_manager_get_peer_pool_size_hist", metric.WithDescription("amount of available active peers in pool at time when get was called")) if err != nil { return nil, err } - doneResult, err := meter.Int64Counter("peer_manager_done_result_counter", + doneResult, err := meter.Int64Counter(manager.tag+"_peer_manager_done_result_counter", metric.WithDescription("done results counter")) if err != nil { return nil, err } - validationResult, err := meter.Int64Counter("peer_manager_validation_result_counter", + validationResult, err := meter.Int64Counter( + manager.tag+"_peer_manager_validation_result_counter", metric.WithDescription("validation result counter")) if err != nil { return nil, err } - shrexPools, err := meter.Int64ObservableGauge("peer_manager_pools_gauge", + shrexPools, err := meter.Int64ObservableGauge(manager.tag+"_peer_manager_pools_gauge", metric.WithDescription("pools amount")) if err != nil { return nil, err } - fullNodesPool, err := meter.Int64ObservableGauge("peer_manager_full_nodes_gauge", - metric.WithDescription("full nodes pool peers amount")) + discoveredPool, err := meter.Int64ObservableGauge( + manager.tag+"_peer_manager_discovered_nodes_gauge", + metric.WithDescription("discovered nodes pool peers amount")) if err != nil { return nil, err } - blacklisted, err := meter.Int64ObservableGauge("peer_manager_blacklisted_peers", + blacklisted, err := meter.Int64ObservableGauge( + manager.tag+"peer_manager_blacklisted_peers", metric.WithDescription("blacklisted peers amount")) if err != nil { return nil, err @@ -127,7 +132,7 @@ func initMetrics(manager *Manager) (*metrics, error) { doneResult: doneResult, validationResult: validationResult, shrexPools: shrexPools, - fullNodesPool: fullNodesPool, + discoveredPool: discoveredPool, getPeerPoolSizeHistogram: getPeerPoolSizeHistogram, blacklistedPeers: blacklisted, } @@ -139,10 +144,10 @@ func initMetrics(manager *Manager) (*metrics, error) { attribute.String(poolStatusKey, string(poolStatus)))) } - observer.ObserveInt64(fullNodesPool, int64(manager.nodes.len()), + observer.ObserveInt64(discoveredPool, int64(manager.nodes.len()), metric.WithAttributes( attribute.String(peerStatusKey, string(peerStatusActive)))) - observer.ObserveInt64(fullNodesPool, int64(manager.nodes.cooldown.len()), + observer.ObserveInt64(discoveredPool, int64(manager.nodes.cooldown.len()), metric.WithAttributes( attribute.String(peerStatusKey, string(peerStatusCooldown)))) @@ -156,7 +161,7 @@ func initMetrics(manager *Manager) (*metrics, error) { }) return nil } - metrics.clientReg, err = meter.RegisterCallback(callback, shrexPools, fullNodesPool, blacklisted) + metrics.clientReg, err = meter.RegisterCallback(callback, shrexPools, discoveredPool, blacklisted) if err != nil { return nil, fmt.Errorf("registering metrics callback: %w", err) }