Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Persisting/seeding a routing table #315

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

periodicproc "github.com/jbenet/goprocess/periodic"
"golang.org/x/xerrors"

opts "github.com/libp2p/go-libp2p-kad-dht/opts"
Expand Down Expand Up @@ -80,17 +81,42 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
return nil, err
}
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols)

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
dht := makeDHT(ctx, h, &cfg)

dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})

var (
candidates []peer.ID
err error
)

if cfg.Snapshotter != nil {
candidates, err = cfg.Snapshotter.Load()
if err != nil {
logger.Warningf("error while loading snapshot of DHT routing table: %s", err)
}
sproc := periodicproc.Tick(cfg.SnapshotInterval, func(proc goprocess.Process) {
logger.Debugf("storing snapshot of DHT routing table")
err := cfg.Snapshotter.Store(dht.routingTable)
if err != nil {
logger.Warningf("error while storing snapshot of DHT routing table snapshot: %s", err)
}
})
dht.proc.AddChild(sproc)
}

if err := cfg.Seeder.Seed(dht.routingTable, candidates, cfg.FallbackPeers); err != nil {
logger.Warningf("error while seedindg candidates to the routing table: %s", err)
}

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))

dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator

Expand Down Expand Up @@ -125,7 +151,7 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
return dht
}

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID) *IpfsDHT {
func makeDHT(ctx context.Context, h host.Host, cfg *opts.Options) *IpfsDHT {
rt := kb.NewRoutingTable(KValue, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore())

cmgr := h.ConnManager()
Expand All @@ -137,22 +163,21 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
}

return &IpfsDHT{
datastore: dstore,
datastore: cfg.Datastore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
providers: providers.NewProviderManager(ctx, h.ID(), cfg.Datastore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
protocols: cfg.Protocols,
}
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {

pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
pmes.Record = rec
rpmes, err := dht.sendRequest(ctx, p, pmes)
Expand Down
40 changes: 21 additions & 19 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,32 @@ module github.com/libp2p/go-libp2p-kad-dht
require (
github.com/gogo/protobuf v1.2.1
github.com/hashicorp/golang-lru v0.5.1
github.com/ipfs/go-cid v0.0.1
github.com/ipfs/go-datastore v0.0.1
github.com/ipfs/go-cid v0.0.2
github.com/ipfs/go-datastore v0.0.5
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-todocounter v0.0.1
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8
github.com/libp2p/go-libp2p v0.0.2
github.com/libp2p/go-libp2p-crypto v0.0.1
github.com/libp2p/go-libp2p-host v0.0.1
github.com/libp2p/go-libp2p-kbucket v0.0.1
github.com/libp2p/go-libp2p-net v0.0.1
github.com/libp2p/go-libp2p-peer v0.0.1
github.com/libp2p/go-libp2p-peerstore v0.0.1
github.com/libp2p/go-libp2p-protocol v0.0.1
github.com/libp2p/go-libp2p-record v0.0.1
github.com/libp2p/go-libp2p-routing v0.0.1
github.com/libp2p/go-libp2p-swarm v0.0.1
github.com/libp2p/go-testutil v0.0.1
github.com/mr-tron/base58 v1.1.0
github.com/multiformats/go-multiaddr v0.0.1
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-libp2p v0.1.0
github.com/libp2p/go-libp2p-core v0.0.1
github.com/libp2p/go-libp2p-crypto v0.1.0
github.com/libp2p/go-libp2p-host v0.1.0
github.com/libp2p/go-libp2p-kbucket v0.2.0
github.com/libp2p/go-libp2p-net v0.1.0
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.1.0
github.com/libp2p/go-libp2p-protocol v0.1.0
github.com/libp2p/go-libp2p-record v0.1.0
github.com/libp2p/go-libp2p-routing v0.1.0
github.com/libp2p/go-libp2p-swarm v0.1.0
github.com/libp2p/go-libp2p-testing v0.0.3
github.com/mr-tron/base58 v1.1.2
github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-dns v0.0.2
github.com/multiformats/go-multistream v0.0.1
github.com/multiformats/go-multistream v0.1.0
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
golang.org/x/xerrors v0.0.0-20190212162355-a5947ffaace3
go.opencensus.io v0.21.0
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522
)
Loading