Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p/discovery)!: Implement archival discovery + syncing of historic blocks and blobs #3188

Merged
merged 28 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7a45fb8
archival components in nodebuilder
renaynay Feb 14, 2024
e3543a0
chore: lint + rebase fixes
renaynay Mar 18, 2024
f39b24f
chore(das): clean up daser recency check
renaynay Mar 18, 2024
8b9576c
chores
renaynay Apr 11, 2024
0e6338f
fix(share/getters): shrex getter should check if root is within LIGHT…
renaynay Apr 22, 2024
d49f927
feat(share/getters): Use light window as the default avail window for…
renaynay Apr 22, 2024
8292729
test(nodebuilder/tests): Archival blob sync + the most convoluted fx …
renaynay May 3, 2024
d07c127
chore: cleanup
renaynay May 8, 2024
7c8b369
fix(share/p2p): Adjust metrics with tag per instance of discovery -- …
renaynay May 8, 2024
3ff1176
fix(nodebuilder/share): invoke metrics for both peermans
renaynay May 8, 2024
8cb52f1
feat(share/discovery): implement discovery manager to clarify lc mana…
renaynay May 10, 2024
e9cd60f
fix(share/p2p/discovery): return err instantly if manager fails to st…
renaynay May 15, 2024
86b03be
chore(das): fix test after rebase
renaynay May 16, 2024
3e82c95
chore: more fixes from rebase
renaynay May 16, 2024
3a82abe
addresing Vlad commentws
renaynay May 16, 2024
6fd8d3f
chore(nodebuilder/pruner): simplify provides
renaynay May 16, 2024
e78b621
chore(share/p2p): refactor metrics tags to sipmlify
renaynay May 16, 2024
16e7c0e
nit: use ctx from Start for disc.Start even though its ignored
renaynay May 16, 2024
47be07f
doc(nodebuilder/tests): Add doc to pruner test
renaynay May 16, 2024
6df5026
fix(nodebuilder/share): disable advertise for pruned FN and BN for ar…
renaynay May 16, 2024
b54ef5e
chore(nodebuilder/share): Clean up share module further
renaynay May 17, 2024
ca9dd21
fix(nodebuilder): Some provides added improperly
renaynay May 22, 2024
aa091af
chore(share/p2p/discovery): Remove EnableAdvertise as config param an…
renaynay May 23, 2024
da20efc
chore(nodebuilder/share | share/getters): Delegate lifecycle manageme…
renaynay May 23, 2024
b48d696
chore(share/p2p/peers): require tag for peer manager
renaynay May 23, 2024
2010374
doc(share/p2p/discovery): finish doc for advertise field
renaynay May 27, 2024
fe47aa1
chore(share/p2p/discovery): Start advertise from disc start instead o…
renaynay May 27, 2024
9056be6
Merge branch 'main' into shrex-getter-routing
renaynay May 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -14,6 +13,7 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
Expand Down Expand Up @@ -162,7 +162,7 @@ func (d *DASer) Stop(ctx context.Context) error {
func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
// short-circuit if pruning is enabled and the header is outside the
// availability window
if !d.isWithinSamplingWindow(h) {
if !pruner.IsWithinAvailabilityWindow(h.Time(), d.params.samplingWindow) {
log.Debugw("skipping header outside sampling window", "height", h.Height(),
"time", h.Time())
return errOutsideSamplingWindow
Expand All @@ -183,14 +183,6 @@ func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
return nil
}

func (d *DASer) isWithinSamplingWindow(eh *header.ExtendedHeader) bool {
// if sampling window is not set, then all headers are within the window
if d.params.samplingWindow.Duration() == 0 {
return true
}
return time.Since(eh.Time()) <= d.params.samplingWindow.Duration()
}

// SamplingStats returns the current statistics over the DA sampling process.
func (d *DASer) SamplingStats(ctx context.Context) (SamplingStats, error) {
return d.sampler.stats(ctx)
Expand Down
6 changes: 5 additions & 1 deletion das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,11 @@ func TestDASer_SamplingWindow(t *testing.T) {
eh := headertest.RandExtendedHeader(t)
eh.RawHeader.Time = tt.timestamp

assert.Equal(t, tt.withinWindow, daser.isWithinSamplingWindow(eh))
assert.Equal(
t,
tt.withinWindow,
pruner.IsWithinAvailabilityWindow(eh.Time(), daser.params.samplingWindow),
)
})
}
}
Expand Down
4 changes: 4 additions & 0 deletions nodebuilder/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
"github.com/celestiaorg/celestia-node/share/eds"
)

var (
Expand All @@ -56,6 +57,9 @@ type Node struct {
RPCServer *rpc.Server // not optional
GatewayServer *gateway.Server `optional:"true"`

// block store
EDSStore *eds.Store `optional:"true"`
renaynay marked this conversation as resolved.
Show resolved Hide resolved

// p2p components
Host host.Host
ConnGater *conngater.BasicConnectionGater
Expand Down
89 changes: 49 additions & 40 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,13 @@ import (
)

func ConstructModule(tp node.Type, cfg *Config) fx.Option {
if !cfg.EnableService {
switch tp {
case node.Light:
// light nodes are still subject to sampling within window
// even if pruning is not enabled.
return fx.Options(
fx.Supply(light.Window),
)
case node.Full:
return fx.Options(
fx.Supply(archival.Window),
)
case node.Bridge:
return fx.Options(
fx.Supply(archival.Window),
fx.Provide(func() []core.Option {
return []core.Option{}
}),
)
default:
panic("unknown node type")
}
}

baseComponents := fx.Options(
fx.Supply(cfg),
availWindow(tp, cfg.EnableService),
)

fullAndBridgeComponents := fx.Options(
baseComponents,
fx.Provide(fx.Annotate(
newPrunerService,
fx.OnStart(func(ctx context.Context, p *pruner.Service) error {
Expand All @@ -52,34 +34,61 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
// This is necessary to invoke the pruner service as independent thanks to a
// quirk in FX.
fx.Invoke(func(_ *pruner.Service) {}),
fx.Provide(func(store *eds.Store) pruner.Pruner {
return full.NewPruner(store)
}),
)

switch tp {
case node.Full:
// TODO: Eventually, light nodes will be capable of pruning samples
// in which case, this can be enabled.
case node.Light:
return fx.Module("prune",
baseComponents,
fx.Provide(func(store *eds.Store) pruner.Pruner {
return full.NewPruner(store)
}),
fx.Supply(full.Window),
)
case node.Full:
opts := baseComponents
if cfg.EnableService {
opts = fullAndBridgeComponents
}
return fx.Module("prune",
opts,
)
case node.Bridge:
if cfg.EnableService {
return fx.Module("prune",
fullAndBridgeComponents,
fx.Provide(func(window pruner.AvailabilityWindow) []core.Option {
return []core.Option{core.WithAvailabilityWindow(window)}
}),
)
}
return fx.Module("prune",
baseComponents,
fx.Provide(func(store *eds.Store) pruner.Pruner {
return full.NewPruner(store)
}),
fx.Supply(full.Window),
fx.Provide(func(window pruner.AvailabilityWindow) []core.Option {
return []core.Option{core.WithAvailabilityWindow(window)}
fx.Provide(func() []core.Option {
return []core.Option{}
}),
)
// TODO: Eventually, light nodes will be capable of pruning samples
// in which case, this can be enabled.
default:
panic("unknown node type")
}
}

func availWindow(tp node.Type, pruneEnabled bool) fx.Option {
switch tp {
case node.Light:
return fx.Module("prune",
fx.Supply(light.Window),
)
// light nodes are still subject to sampling within window
// even if pruning is not enabled.
return fx.Provide(func() pruner.AvailabilityWindow {
return light.Window
})
case node.Full, node.Bridge:
return fx.Provide(func() pruner.AvailabilityWindow {
if pruneEnabled {
return full.Window
}
return archival.Window
})
default:
panic("unknown node type")
}
Expand Down
27 changes: 0 additions & 27 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,15 @@ import (

"github.com/filecoin-project/dagstore"
"github.com/ipfs/boxo/blockservice"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"

"github.com/celestiaorg/celestia-app/pkg/da"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/ipld"
disc "github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
)

const (
// fullNodesTag is the tag used to identify full nodes in the discovery service.
fullNodesTag = "full"
)

func newDiscovery(cfg *disc.Parameters,
) func(routing.ContentRouting, host.Host, *peers.Manager) (*disc.Discovery, error) {
return func(
r routing.ContentRouting,
h host.Host,
manager *peers.Manager,
) (*disc.Discovery, error) {
return disc.NewDiscovery(
cfg,
h,
routingdisc.NewRoutingDiscovery(r),
fullNodesTag,
disc.WithOnPeersUpdate(manager.UpdateNodePool),
)
}
}

func newShareModule(getter share.Getter, avail share.Availability) Module {
return &module{getter, avail}
}
Expand Down
Loading
Loading