Skip to content

Commit

Permalink
non-interface changes as per adin's review
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Dec 15, 2019
1 parent 9965eba commit bdeb350
Show file tree
Hide file tree
Showing 11 changed files with 379 additions and 144 deletions.
73 changes: 33 additions & 40 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
"github.com/multiformats/go-base32"
pkgerr "github.com/pkg/errors"
)

var logger = logging.Logger("dht")
Expand Down Expand Up @@ -82,8 +83,12 @@ type IpfsDHT struct {

triggerBootstrap chan struct{}

seedsProposer persist.SeedsProposer
maxRecordAge time.Duration
seedsProposer persist.SeedsProposer
seederDialTimeout time.Duration
totalSeederDialTimeout time.Duration
seederConcurrentDials int

maxRecordAge time.Duration

// Allows disabling dht subsystems. These should _only_ be set on
// "forked" DHTs (e.g., DHTs with custom protocols and/or private
Expand All @@ -103,24 +108,23 @@ var (

// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
var cfg opts.Options
cfg := &opts.Options{}
cfg.BucketSize = KValue
if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
return nil, err
}

// set seedsProposer, snapshotter & fallback peers if not set
seedsProposer := cfg.Persistence.SeedsProposer
if seedsProposer == nil {
seedsProposer = persist.NewRandomSeedsProposer(h, persist.DefaultRndSeederTarget)
if cfg.Persistence.SeedsProposer == nil {
cfg.Persistence.SeedsProposer = persist.NewRandomSeedsProposer(h, persist.DefaultRndSeederTarget)
}
snapshotter := cfg.Persistence.Snapshotter
if snapshotter == nil {
s, err := persist.NewDatastoreSnapshotter(cfg.Datastore, persist.DefaultSnapshotNS)
// should never happen
if err != nil {
logger.Error("failed to initialize the default datastore backed snapshotter")
panic(err)
logger.Errorf("failed to initialize the default datastore backed snapshotter, err: %s", err)
return nil, pkgerr.WithMessage(err, "failed to initialize the default datastore backed snapshotter")
}
snapshotter = s
}
Expand All @@ -129,18 +133,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
cfg.Persistence.FallbackPeers = getDefaultBootstrapPeerIDs()
}

dht := makeDHT(ctx, h, &cfg, cfg.BucketSize)

dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize)
dht.autoRefresh = cfg.RoutingTable.AutoRefresh
dht.rtRefreshPeriod = cfg.RoutingTable.RefreshPeriod
dht.rtRefreshQueryTimeout = cfg.RoutingTable.RefreshQueryTimeout

dht.maxRecordAge = cfg.MaxRecordAge
dht.enableProviders = cfg.EnableProviders
dht.enableValues = cfg.EnableValues
dht.bootstrapCfg = cfg.BootstrapConfig
dht.seedsProposer = seedsProposer
dht := makeDHT(ctx, h, cfg)

dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
Expand Down Expand Up @@ -204,9 +197,9 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
return dht
}

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT {
func makeDHT(ctx context.Context, h host.Host, cfg *opts.Options) *IpfsDHT {
self := kb.ConvertPeerID(h.ID())
rt := kb.NewRoutingTable(bucketSize, self, time.Minute, h.Peerstore())
rt := kb.NewRoutingTable(cfg.BucketSize, self, time.Minute, h.Peerstore())
cmgr := h.ConnManager()

rt.PeerAdded = func(p peer.ID) {
Expand All @@ -218,37 +211,37 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
cmgr.UntagPeer(p, "kbucket")
}

// TODO this is just an example of how the Persist/Seeder API would be used.
// We should set the Snapshotter and the Seeder as fields in IpfsDHT.
/*if cfg.Persistence != nil {
if cfg.Persistence.Snapshotter != nil && cfg.Persistence.Seeder != nil {
candidates, err := cfg.Persistence.Snapshotter.Load()
if err != nil {
logger.Warningf("error while loading a previous snapshot: %s", err)
}
if err = cfg.Persistence.Seeder.Seed(rt, candidates, cfg.Persistence.FallbackPeers); err != nil {
logger.Warningf("error while seedindg candidates to the routing table: %s", err)
}
}
}*/

dht := &IpfsDHT{
datastore: dstore,
datastore: cfg.Datastore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
providers: providers.NewProviderManager(ctx, h.ID(), cfg.Datastore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
protocols: cfg.Protocols,
bucketSize: cfg.BucketSize,
triggerRtRefresh: make(chan chan<- error),
}

dht.ctx = dht.newContextWithLocalTags(ctx)

dht.autoRefresh = cfg.RoutingTable.AutoRefresh
dht.rtRefreshPeriod = cfg.RoutingTable.RefreshPeriod
dht.rtRefreshQueryTimeout = cfg.RoutingTable.RefreshQueryTimeout

dht.maxRecordAge = cfg.MaxRecordAge
dht.enableProviders = cfg.EnableProviders
dht.enableValues = cfg.EnableValues
dht.bootstrapCfg = cfg.BootstrapConfig

dht.seedsProposer = cfg.Persistence.SeedsProposer
dht.seederDialTimeout = cfg.Persistence.SeederDialTimeout
dht.totalSeederDialTimeout = cfg.Persistence.TotalSeederDialTimeout
dht.seederConcurrentDials = cfg.Persistence.SeederConcurrentDials

return dht
}

Expand Down
14 changes: 14 additions & 0 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
Expand Down Expand Up @@ -43,6 +44,19 @@ func init() {
}
}

func getDefaultBootstrapPeerIDs() []peer.ID {
var defaultBootstrapPeerIDs []peer.ID
for i := range DefaultBootstrapPeers {
info, err := peer.AddrInfoFromP2pAddr(DefaultBootstrapPeers[i])
if err != nil {
logger.Errorf("failed to get peerID for peer with multiaddress %s: error is %s", DefaultBootstrapPeers[i].String(), err)
continue
}
defaultBootstrapPeerIDs = append(defaultBootstrapPeerIDs, info.ID)
}
return defaultBootstrapPeerIDs
}

// Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() error {
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
Expand Down
22 changes: 7 additions & 15 deletions dht_rt_seeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,18 @@ package dht

import (
"context"
"time"

"github.com/ipfs/go-todocounter"
"github.com/libp2p/go-libp2p-core/peer"
)

// SeederDialTimeout is the grace period for one dial attempt
var SeederDialTimeout = 5 * time.Second

// TotalSeederDialTimeout is the total grace period for a group of dial attempts
var TotalSeederDialTimeout = 30 * time.Second

// SeederConcurrentDials is the number of peers we will dial simultaneously
var SeederConcurrentDials = 50

func (dht *IpfsDHT) seedRoutingTable(candidates, fallbacks []peer.ID) error {
peersFromSet := func(m map[peer.ID]struct{}) []peer.ID {
var peers []peer.ID
peers := make([]peer.ID, len(m))
i := 0
for p := range m {
peers = append(peers, p)
peers[i] = p
i++
}

return peers
Expand Down Expand Up @@ -55,7 +47,7 @@ func (dht *IpfsDHT) seedRoutingTable(candidates, fallbacks []peer.ID) error {

// attempts to dial to a given peer to verify it's available
dialFn := func(ctx context.Context, p peer.ID, res chan<- result) {
childCtx, cancel := context.WithTimeout(ctx, SeederDialTimeout)
childCtx, cancel := context.WithTimeout(ctx, dht.seederDialTimeout)
defer cancel()
_, err := dht.host.Network().DialPeer(childCtx, p)
select {
Expand All @@ -65,11 +57,11 @@ func (dht *IpfsDHT) seedRoutingTable(candidates, fallbacks []peer.ID) error {
}

resCh := make(chan result) // dial results.
ctx, cancel := context.WithTimeout(dht.ctx, TotalSeederDialTimeout)
ctx, cancel := context.WithTimeout(dht.ctx, dht.totalSeederDialTimeout)
defer cancel()

// start dialing
semaphore := make(chan struct{}, SeederConcurrentDials)
semaphore := make(chan struct{}, dht.seederConcurrentDials)
go func(peers []peer.ID) {
for _, p := range peers {
semaphore <- struct{}{}
Expand Down
20 changes: 8 additions & 12 deletions dht_rt_seeder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,18 @@ func TestRTSeeder(t *testing.T) {
nFallbacks int // fallback list
expectedNumPeersInRoutingTable int // number of peers we expect in the routing table after seeding is complete
}{
"Only Candidates": {10, 1, 0,
9},

"Candidates + Fallbacks": {10, 4, 7,
13},

"Only Fallbacks": {5, 5, 9,
9},

"Empty Candidates": {0, 0, 5,
5},
"Only Candidates": {10, 1, 0, 9},
"Candidates + Fallbacks": {10, 4, 7, 13},
"Only Fallbacks": {5, 5, 9, 9},
"Empty Candidates": {0, 0, 5, 5},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for name, testcase := range testCases {
// create host for self
self := bhost.New(swarmt.GenSwarm(t, context.Background(), swarmt.OptDisableReuseport))
self := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))

// create candidate hosts & add them to the peer store
candidateHosts := make([]bhost.BasicHost, testcase.nTotalCandidates)
Expand Down
Loading

0 comments on commit bdeb350

Please sign in to comment.