Skip to content

Commit

Permalink
feat(p2p/discovery)!: Implement archival discovery + syncing of his…
Browse files Browse the repository at this point in the history
…toric blocks and blobs (#3188)

This PR introduces the discovery of nodes advertising on the `archival`
topic for the purpose of being able to request blobs and blocks that are
older than the sampling window.

In order to do so, there are now 2 instances of discovery (one for
`full` nodes -- _all nodes running full block sync within the sampling
window_, and one for `archival` nodes -- _nodes that sync, store and
serve all historic data_) and 2 instances of peer managers (one for
`full` and one for `archival`).

**This PR is _breaking_ due to changes to metrics names.**

Please also note that this PR introduces the ability to perform archival
sync of blobs + blocks for **LIGHT AND FULL** nodes only regardless of
whether the full nodes have pruning enabled or not. **Bridge nodes that
have previously had pruning enabled will NOT be able to request historic
data from the consensus core node.** The bridge node must run in
archival mode (without `--experimental-pruning` enabled) in order to
access historic data.

**Bridge and Full nodes in archival mode (without
`--experimental-pruning`)**
```
2024-05-10T15:21:44.490+0200	INFO	share/discovery	discovery/manager.go:30	starting discovery	{"topic": "full"}
2024-05-10T15:21:44.491+0200	INFO	share/discovery	discovery/manager.go:33	advertising to topic	{"topic": "full"}
2024-05-10T15:21:44.491+0200	INFO	share/discovery	discovery/manager.go:30	starting discovery	{"topic": "archival"}
2024-05-10T15:21:44.491+0200	INFO	share/discovery	discovery/manager.go:33	advertising to topic	{"topic": "archival"}
```


**Bridge and Full nodes running `--experimental-pruning`**
```
2024-05-10T15:19:20.698+0200	INFO	share/discovery	discovery/manager.go:30	starting discovery	{"topic": "full"}
2024-05-10T15:19:20.698+0200	INFO	share/discovery	discovery/manager.go:33	advertising to topic	{"topic": "full"}
2024-05-10T15:19:20.698+0200	INFO	share/discovery	discovery/manager.go:30	starting discovery	{"topic": "archival"}
```

**Light nodes**
```
2024-05-10T15:20:42.440+0200	INFO	share/discovery	discovery/manager.go:30	starting discovery	{"topic": "full"}
2024-05-10T15:20:42.440+0200	INFO	share/discovery	discovery/manager.go:30	starting discovery	{"topic": "archival"}
```


-----------------------------------------------------------

**TODO**
- [x] test archival request routing per node type
- [x] renaming some things inside discovery away from being
`full`-node-centric
- [x] rebase
- [x] swamp test

For follow-up PR:
- [x] clarify / make disc metrics generic as there will now be 2
instances of discovery happening

Nice to have; 
- [ ] benchmarks for LN
  • Loading branch information
renaynay authored and ramin committed Jun 6, 2024
1 parent 20246bd commit 19f7610
Show file tree
Hide file tree
Showing 19 changed files with 634 additions and 305 deletions.
12 changes: 2 additions & 10 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -14,6 +13,7 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
Expand Down Expand Up @@ -162,7 +162,7 @@ func (d *DASer) Stop(ctx context.Context) error {
func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
// short-circuit if pruning is enabled and the header is outside the
// availability window
if !d.isWithinSamplingWindow(h) {
if !pruner.IsWithinAvailabilityWindow(h.Time(), d.params.samplingWindow) {
log.Debugw("skipping header outside sampling window", "height", h.Height(),
"time", h.Time())
return errOutsideSamplingWindow
Expand All @@ -183,14 +183,6 @@ func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
return nil
}

func (d *DASer) isWithinSamplingWindow(eh *header.ExtendedHeader) bool {
// if sampling window is not set, then all headers are within the window
if d.params.samplingWindow.Duration() == 0 {
return true
}
return time.Since(eh.Time()) <= d.params.samplingWindow.Duration()
}

// SamplingStats returns the current statistics over the DA sampling process.
func (d *DASer) SamplingStats(ctx context.Context) (SamplingStats, error) {
return d.sampler.stats(ctx)
Expand Down
6 changes: 5 additions & 1 deletion das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,11 @@ func TestDASer_SamplingWindow(t *testing.T) {
eh := headertest.RandExtendedHeader(t)
eh.RawHeader.Time = tt.timestamp

assert.Equal(t, tt.withinWindow, daser.isWithinSamplingWindow(eh))
assert.Equal(
t,
tt.withinWindow,
pruner.IsWithinAvailabilityWindow(eh.Time(), daser.params.samplingWindow),
)
})
}
}
Expand Down
4 changes: 4 additions & 0 deletions nodebuilder/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
"github.com/celestiaorg/celestia-node/share/eds"
)

var (
Expand All @@ -56,6 +57,9 @@ type Node struct {
RPCServer *rpc.Server // not optional
GatewayServer *gateway.Server `optional:"true"`

// block store
EDSStore *eds.Store `optional:"true"`

// p2p components
Host host.Host
ConnGater *conngater.BasicConnectionGater
Expand Down
89 changes: 49 additions & 40 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,13 @@ import (
)

func ConstructModule(tp node.Type, cfg *Config) fx.Option {
if !cfg.EnableService {
switch tp {
case node.Light:
// light nodes are still subject to sampling within window
// even if pruning is not enabled.
return fx.Options(
fx.Supply(light.Window),
)
case node.Full:
return fx.Options(
fx.Supply(archival.Window),
)
case node.Bridge:
return fx.Options(
fx.Supply(archival.Window),
fx.Provide(func() []core.Option {
return []core.Option{}
}),
)
default:
panic("unknown node type")
}
}

baseComponents := fx.Options(
fx.Supply(cfg),
availWindow(tp, cfg.EnableService),
)

fullAndBridgeComponents := fx.Options(
baseComponents,
fx.Provide(fx.Annotate(
newPrunerService,
fx.OnStart(func(ctx context.Context, p *pruner.Service) error {
Expand All @@ -52,34 +34,61 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
// This is necessary to invoke the pruner service as independent thanks to a
// quirk in FX.
fx.Invoke(func(_ *pruner.Service) {}),
fx.Provide(func(store *eds.Store) pruner.Pruner {
return full.NewPruner(store)
}),
)

switch tp {
case node.Full:
// TODO: Eventually, light nodes will be capable of pruning samples
// in which case, this can be enabled.
case node.Light:
return fx.Module("prune",
baseComponents,
fx.Provide(func(store *eds.Store) pruner.Pruner {
return full.NewPruner(store)
}),
fx.Supply(full.Window),
)
case node.Full:
opts := baseComponents
if cfg.EnableService {
opts = fullAndBridgeComponents
}
return fx.Module("prune",
opts,
)
case node.Bridge:
if cfg.EnableService {
return fx.Module("prune",
fullAndBridgeComponents,
fx.Provide(func(window pruner.AvailabilityWindow) []core.Option {
return []core.Option{core.WithAvailabilityWindow(window)}
}),
)
}
return fx.Module("prune",
baseComponents,
fx.Provide(func(store *eds.Store) pruner.Pruner {
return full.NewPruner(store)
}),
fx.Supply(full.Window),
fx.Provide(func(window pruner.AvailabilityWindow) []core.Option {
return []core.Option{core.WithAvailabilityWindow(window)}
fx.Provide(func() []core.Option {
return []core.Option{}
}),
)
// TODO: Eventually, light nodes will be capable of pruning samples
// in which case, this can be enabled.
default:
panic("unknown node type")
}
}

func availWindow(tp node.Type, pruneEnabled bool) fx.Option {
switch tp {
case node.Light:
return fx.Module("prune",
fx.Supply(light.Window),
)
// light nodes are still subject to sampling within window
// even if pruning is not enabled.
return fx.Provide(func() pruner.AvailabilityWindow {
return light.Window
})
case node.Full, node.Bridge:
return fx.Provide(func() pruner.AvailabilityWindow {
if pruneEnabled {
return full.Window
}
return archival.Window
})
default:
panic("unknown node type")
}
Expand Down
27 changes: 0 additions & 27 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,15 @@ import (

"github.com/filecoin-project/dagstore"
"github.com/ipfs/boxo/blockservice"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"

"github.com/celestiaorg/celestia-app/pkg/da"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/ipld"
disc "github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
)

const (
// fullNodesTag is the tag used to identify full nodes in the discovery service.
fullNodesTag = "full"
)

func newDiscovery(cfg *disc.Parameters,
) func(routing.ContentRouting, host.Host, *peers.Manager) (*disc.Discovery, error) {
return func(
r routing.ContentRouting,
h host.Host,
manager *peers.Manager,
) (*disc.Discovery, error) {
return disc.NewDiscovery(
cfg,
h,
routingdisc.NewRoutingDiscovery(r),
fullNodesTag,
disc.WithOnPeersUpdate(manager.UpdateNodePool),
)
}
}

func newShareModule(getter share.Getter, avail share.Availability) Module {
return &module{getter, avail}
}
Expand Down
Loading

0 comments on commit 19f7610

Please sign in to comment.