Skip to content

Commit

Permalink
feat: add optimistic provide
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert committed Mar 25, 2023
1 parent 1457b4f commit 296ed70
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 86 deletions.
18 changes: 10 additions & 8 deletions config/experiments.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package config

type Experiments struct {
FilestoreEnabled bool
UrlstoreEnabled bool
ShardingEnabled bool `json:",omitempty"` // deprecated by autosharding: https://github.com/ipfs/kubo/pull/8527
GraphsyncEnabled bool
Libp2pStreamMounting bool
P2pHttpProxy bool //nolint
StrategicProviding bool
AcceleratedDHTClient bool
FilestoreEnabled bool
UrlstoreEnabled bool
ShardingEnabled bool `json:",omitempty"` // deprecated by autosharding: https://github.com/ipfs/kubo/pull/8527
GraphsyncEnabled bool
Libp2pStreamMounting bool
P2pHttpProxy bool //nolint
StrategicProviding bool
AcceleratedDHTClient bool
OptimisticProvide bool
OptimisticProvideJobsPoolSize int
}
21 changes: 14 additions & 7 deletions core/node/libp2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,18 @@ func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHo
return out, err
}

routingOptArgs := RoutingOptionArgs{
Ctx: ctx,
Datastore: params.Repo.Datastore(),
Validator: params.Validator,
BootstrapPeers: bootstrappers,
OptimisticProvide: cfg.Experimental.OptimisticProvide,
OptimisticProvideJobsPoolSize: cfg.Experimental.OptimisticProvideJobsPoolSize,
}
opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
r, err := params.RoutingOption(
ctx, h,
params.Repo.Datastore(),
params.Validator,
bootstrappers...,
)
args := routingOptArgs
args.Host = h
r, err := params.RoutingOption(args)
out.Routing = r
return r, err
}))
Expand All @@ -69,10 +74,12 @@ func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHo
return P2PHostOut{}, err
}

routingOptArgs.Host = out.Host

// this code is necessary just for tests: mock network constructions
// ignore the libp2p constructor options that actually construct the routing!
if out.Routing == nil {
r, err := params.RoutingOption(ctx, out.Host, params.Repo.Datastore(), params.Validator, bootstrappers...)
r, err := params.RoutingOption(routingOptArgs)
if err != nil {
return P2PHostOut{}, err
}
Expand Down
109 changes: 40 additions & 69 deletions core/node/libp2p/routingopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ import (
routing "github.com/libp2p/go-libp2p/core/routing"
)

type RoutingOption func(
context.Context,
host.Host,
datastore.Batching,
record.Validator,
...peer.AddrInfo,
) (routing.Routing, error)
type RoutingOptionArgs struct {
Ctx context.Context
Host host.Host
Datastore datastore.Batching
Validator record.Validator
BootstrapPeers []peer.AddrInfo
OptimisticProvide bool
OptimisticProvideJobsPoolSize int
}

type RoutingOption func(args RoutingOptionArgs) (routing.Routing, error)

// Default HTTP routers used in parallel to DHT when Routing.Type = "auto"
var defaultHTTPRouters = []string{
Expand All @@ -40,25 +44,13 @@ func init() {
}

// ConstructDefaultRouting returns routers used when Routing.Type is unset or set to "auto"
func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
return func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
// Defined routers will be queried in parallel (optimizing for response speed)
// Different trade-offs can be made by setting Routing.Type = "custom" with own Routing.Routers
var routers []*routinghelpers.ParallelRouter

dhtRouting, err := routingOpt(ctx, host, dstore, validator, bootstrapPeers...)
dhtRouting, err := routingOpt(args)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -98,54 +90,39 @@ func ConstructDefaultRouting(peerID string, addrs []string, privKey string, rout
}

// constructDHTRouting is used when Routing.Type = "dht"
func constructDHTRouting(mode dht.ModeOpt) func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
return func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func constructDHTRouting(mode dht.ModeOpt) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
dhtOpts := []dht.Option{
dht.Concurrency(10),
dht.Mode(mode),
dht.Datastore(args.Datastore),
dht.Validator(args.Validator),
dht.OptimisticProvideJobsPoolSize(size int)
}
if args.OptimisticProvide {
dhtOpts = append(dhtOpts, dht.EnableOptimisticProvide())
}
if args.OptimisticProvideJobsPoolSize != 0 {
dhtOpts = append(dhtOpts, dht.OptimisticProvideJobsPoolSize(args.OptimisticProvideJobsPoolSize))
}
return dual.New(
ctx, host,
dual.DHTOption(
dht.Concurrency(10),
dht.Mode(mode),
dht.Datastore(dstore),
dht.Validator(validator)),
dual.WanDHTOption(dht.BootstrapPeers(bootstrapPeers...)),
args.Ctx, args.Host,
dual.DHTOption(dhtOpts...),
dual.WanDHTOption(dht.BootstrapPeers(args.BootstrapPeers...)),
)
}
}

// ConstructDelegatedRouting is used when Routing.Type = "custom"
func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, peerID string, addrs []string, privKey string) func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
return func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, peerID string, addrs []string, privKey string) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
return irouting.Parse(routers, methods,
&irouting.ExtraDHTParams{
BootstrapPeers: bootstrapPeers,
Host: host,
Validator: validator,
Datastore: dstore,
Context: ctx,
BootstrapPeers: args.BootstrapPeers,
Host: args.Host,
Validator: args.Validator,
Datastore: args.Datastore,
Context: args.Ctx,
},
&irouting.ExtraHTTPParams{
PeerID: peerID,
Expand All @@ -155,13 +132,7 @@ func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, p
}
}

func constructNilRouting(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func constructNilRouting(_ RoutingOptionArgs) (routing.Routing, error) {
return routinghelpers.Null{}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ require (
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect
google.golang.org/grpc v1.46.0 // indirect
Expand All @@ -246,3 +247,5 @@ require (
)

go 1.18

replace github.com/libp2p/go-libp2p-kad-dht => ../go-libp2p-kad-dht
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -808,8 +808,6 @@ github.com/libp2p/go-libp2p-gostream v0.5.0 h1:niNGTUrFoUDP/8jxMgu97zngMO+UGYBpV
github.com/libp2p/go-libp2p-gostream v0.5.0/go.mod h1:rXrb0CqfcRRxa7m3RSKORQiKiWgk3IPeXWda66ZXKsA=
github.com/libp2p/go-libp2p-http v0.4.0 h1:V+f9Rhe/8GkColmXoyJyA0NVsN9F3TCLZgW2hwjoX5w=
github.com/libp2p/go-libp2p-http v0.4.0/go.mod h1:92tmLGrlBliQFDlZRpBXT3BJM7rGFONy0vsNrG/bMPg=
github.com/libp2p/go-libp2p-kad-dht v0.21.1 h1:xpfp8/t9+X2ip1l8Umap1/UGNnJ3RHJgKGAEsnRAlTo=
github.com/libp2p/go-libp2p-kad-dht v0.21.1/go.mod h1:Oy8wvbdjpB70eS5AaFaI68tOtrdo3KylTvXDjikxqFo=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9bno/4/U1oA=
github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U=
Expand Down Expand Up @@ -1788,6 +1786,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y=
Expand Down

0 comments on commit 296ed70

Please sign in to comment.