Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve query performance by limiting query width to KValue peers #291

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
987c507
only add successfully queried peers to the peersQueried set
Stebalien Mar 8, 2019
449b434
improve query performance by limiting query width to KValue peers
Stebalien Mar 8, 2019
a0bc445
run all queries to completion
Stebalien Mar 12, 2019
13e5e29
findpeer: return early
Stebalien May 2, 2019
2648b2d
findpeer: drain addresses before processing new ones
Stebalien May 3, 2019
c9f523d
implement kademlia
Stebalien Mar 12, 2019
78a2de6
query(nit): reliably fill rateLimit chan
Stebalien May 2, 2019
2a209f1
query: count already queried peers
Stebalien Jun 6, 2019
da40930
test: bootstrap before testing
Stebalien Jun 6, 2019
6ce8383
Change bucket size to be configurable
michaelavila Jun 26, 2019
1afff31
dont do all those other things
whyrusleeping Jun 26, 2019
bd07cbf
Merge pull request #361 from libp2p/chores/make-bucket-size-configurable
whyrusleeping Jun 26, 2019
604ac88
don't add peers with only private addresses to your routing table (#360)
whyrusleeping Jun 26, 2019
4b31e56
dht mode toggling (modulo dynamic switching) (#350)
whyrusleeping Jun 26, 2019
a4cabc7
consume identify events to evaluate routing table addition. (#365)
raulk Jun 26, 2019
8fe679a
filter unworkable peers in queries + enhanced logging (#363)
raulk Jun 26, 2019
138991d
remove periodic printing of routing table.
raulk Jun 26, 2019
1f4263f
Merge branch 'stabilize' into fix/closest-peers
raulk Jun 27, 2019
eed72b8
fix distance output in logging.
raulk Jun 27, 2019
836a15c
Merge branch 'stabilize' into fix/closest-peers
raulk Jun 27, 2019
22017f0
start with k peers instead of alpha
Stebalien Jun 6, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,10 @@ func TestFindClosestPeers(t *testing.T) {
connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
}

for i := 0; i < nDHTs; i++ {
dhts[i].BootstrapOnce(ctx, DefaultBootstrapConfig)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I spent a while trying to avoid having to fix this test without complicating the query logic too much. Then I realized that was just stupid.

peers, err := dhts[1].GetClosestPeers(ctx, "foo")
if err != nil {
t.Fatal(err)
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module github.com/libp2p/go-libp2p-kad-dht

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gogo/protobuf v1.2.1
github.com/hashicorp/golang-lru v0.5.1
github.com/ipfs/go-cid v0.0.2
Expand All @@ -12,9 +13,9 @@ require (
github.com/libp2p/go-libp2p v0.1.0
github.com/libp2p/go-libp2p-core v0.0.1
github.com/libp2p/go-libp2p-kbucket v0.2.0
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.1.0
github.com/libp2p/go-libp2p-record v0.1.0
github.com/libp2p/go-libp2p-routing v0.1.0
github.com/libp2p/go-libp2p-swarm v0.1.0
github.com/libp2p/go-libp2p-testing v0.0.3
github.com/mr-tron/base58 v1.1.2
Expand All @@ -23,6 +24,7 @@ require (
github.com/multiformats/go-multistream v0.1.0
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1
go.opencensus.io v0.21.0
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ=
github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
Expand Down Expand Up @@ -134,8 +136,6 @@ github.com/libp2p/go-libp2p-peerstore v0.1.0 h1:MKh7pRNPHSh1fLPj8u/M/s/napdmeNpo
github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY=
github.com/libp2p/go-libp2p-record v0.1.0 h1:wHwBGbFzymoIl69BpgwIu0O6ta3TXGcMPvHUAcodzRc=
github.com/libp2p/go-libp2p-record v0.1.0/go.mod h1:ujNc8iuE5dlKWVy6wuL6dd58t0n7xI4hAIl8pE6wu5Q=
github.com/libp2p/go-libp2p-routing v0.1.0 h1:hFnj3WR3E2tOcKaGpyzfP4gvFZ3t8JkQmbapN0Ct+oU=
github.com/libp2p/go-libp2p-routing v0.1.0/go.mod h1:zfLhI1RI8RLEzmEaaPwzonRvXeeSHddONWkcTcB54nE=
github.com/libp2p/go-libp2p-secio v0.1.0 h1:NNP5KLxuP97sE5Bu3iuwOWyT/dKEGMN5zSLMWdB7GTQ=
github.com/libp2p/go-libp2p-secio v0.1.0/go.mod h1:tMJo2w7h3+wN4pgU2LSYeiKPrfqBgkOsdiKK77hE7c8=
github.com/libp2p/go-libp2p-swarm v0.1.0 h1:HrFk2p0awrGEgch9JXK/qp/hfjqQfgNxpLWnCiWPg5s=
Expand Down
2 changes: 1 addition & 1 deletion handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
pstore "github.com/libp2p/go-libp2p-peerstore"

proto "github.com/gogo/protobuf/proto"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
u "github.com/ipfs/go-ipfs-util"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
pstore "github.com/libp2p/go-libp2p-peerstore"
recpb "github.com/libp2p/go-libp2p-record/pb"
base32 "github.com/whyrusleeping/base32"
)
Expand Down
121 changes: 121 additions & 0 deletions kpeerset/kpeerset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package kpeerset

import (
"container/heap"
"math/big"
"sync"

"github.com/libp2p/go-libp2p-peer"
ks "github.com/whyrusleeping/go-keyspace"
)

// peerMetric tracks a peer and its distance to something else.
type peerMetric struct {
// the peer
peer peer.ID

// big.Int for XOR metric
metric *big.Int
}

// peerMetricHeap implements a heap of peerDistances. Taken from
// go-libp2p-peerstore/queue but inverted. This heap sorts by _furthest_.
type peerMetricHeap []*peerMetric

func (ph peerMetricHeap) Len() int {
return len(ph)
}

func (ph peerMetricHeap) Less(i, j int) bool {
return 1 == ph[i].metric.Cmp(ph[j].metric)
}

func (ph peerMetricHeap) Swap(i, j int) {
ph[i], ph[j] = ph[j], ph[i]
}

func (ph *peerMetricHeap) Push(x interface{}) {
item := x.(*peerMetric)
*ph = append(*ph, item)
}

func (ph *peerMetricHeap) Pop() interface{} {
old := *ph
n := len(old)
item := old[n-1]
*ph = old[0 : n-1]
return item
}

// KPeerSet implements heap.Interface and PeerQueue
type KPeerSet struct {
kvalue int

// from is the Key this PQ measures against
from ks.Key

// heap is a heap of peerDistance items
heap peerMetricHeap

lock sync.RWMutex
}

func (pq *KPeerSet) Len() int {
pq.lock.RLock()
defer pq.lock.RUnlock()

return len(pq.heap)
}

func (pq *KPeerSet) Check(p peer.ID) bool {
pq.lock.RLock()
defer pq.lock.RUnlock()

if pq.heap.Len() < pq.kvalue {
return true
}

distance := ks.XORKeySpace.Key([]byte(p)).Distance(pq.from)
return distance.Cmp(pq.heap[0].metric) != -1
}

func (pq *KPeerSet) Add(p peer.ID) bool {
pq.lock.Lock()
defer pq.lock.Unlock()

distance := ks.XORKeySpace.Key([]byte(p)).Distance(pq.from)

if pq.heap.Len() >= pq.kvalue {
// If we're not closer than the worst peer, drop this.
if distance.Cmp(pq.heap[0].metric) != -1 {
return false
}
// Replacing something, remove it.
heap.Pop(&pq.heap)
}

heap.Push(&pq.heap, &peerMetric{
peer: p,
metric: distance,
})
return true
}

func (pq *KPeerSet) Peers() []peer.ID {
pq.lock.RLock()
defer pq.lock.RUnlock()

ret := make([]peer.ID, len(pq.heap))
for _, pm := range pq.heap {
ret = append(ret, pm.peer)
}
return ret
}

func New(kvalue int, from string) *KPeerSet {
return &KPeerSet{
from: ks.XORKeySpace.Key([]byte(from)),
kvalue: kvalue,
heap: make([]*peerMetric, 0, kvalue),
}
}
59 changes: 28 additions & 31 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"strings"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"

cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kb "github.com/libp2p/go-libp2p-kbucket"
notif "github.com/libp2p/go-libp2p-routing/notifications"
)

func tryFormatLoggableKey(k string) (string, error) {
Expand Down Expand Up @@ -63,13 +63,33 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee

out := make(chan peer.ID, KValue)

query := dht.newClosestPeersQuery(ctx, key, nil)

go func() {
defer close(out)
defer e.Done()
// run it!
res, err := query.Run(ctx, tablepeers)
if err != nil {
logger.Debugf("closestPeers query run error: %s", err)
}

for _, p := range res {
out <- p
}
}()

return out, nil
}

func (dht *IpfsDHT) newClosestPeersQuery(ctx context.Context, key string, finish finishFunc) *dhtQuery {
// since the query doesnt actually pass our context down
// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
parent := ctx
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
return dht.newQuery(key, func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
// For DHT query command
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.SendingQuery,
routing.PublishQueryEvent(parent, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})

Expand All @@ -81,35 +101,12 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

// For DHT query command
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.PeerResponse,
routing.PublishQueryEvent(parent, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: peers,
})

return &dhtQueryResult{closerPeers: peers}, nil
})

go func() {
defer close(out)
defer e.Done()
// run it!
res, err := query.Run(ctx, tablepeers)
if err != nil {
logger.Debugf("closestPeers query run error: %s", err)
}

if res != nil && res.queriedSet != nil {
sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key))
if len(sorted) > KValue {
sorted = sorted[:KValue]
}

for _, p := range sorted {
out <- p
}
}
}()

return out, nil
return peers, nil
}, finish)
}
Loading