Skip to content

Commit

Permalink
improve query performance by limiting query width to KValue peers
Browse files Browse the repository at this point in the history
This is actually still incorrect. We _should_ be limiting our query to
AlphaValue peers and then expanding to KValue peers once we run out of peers to
query. However, this is still much better and we can do that in a followup
commit.

Considerations: We may not want to merge this until we get the multipath lookup
patch. It turns out, our current DHT effectively explores _all_ paths.

fixes #290
  • Loading branch information
Stebalien committed Mar 8, 2019
1 parent 8eca2f2 commit d187dc4
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 14 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ require (
github.com/multiformats/go-multistream v0.0.1
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1
golang.org/x/xerrors v0.0.0-20190212162355-a5947ffaace3
)
121 changes: 121 additions & 0 deletions kpeerset/kpeerset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package kpeerset

import (
"container/heap"
"math/big"
"sync"

"github.com/libp2p/go-libp2p-peer"
ks "github.com/whyrusleeping/go-keyspace"
)

// peerMetric tracks a peer and its distance to something else.
type peerMetric struct {
// the peer
peer peer.ID

// big.Int for XOR metric
metric *big.Int
}

// peerMetricHeap implements a heap of peerDistances. Taken from
// go-libp2p-peerstore/queue but inverted. This heap sorts by _furthest_.
type peerMetricHeap []*peerMetric

func (ph peerMetricHeap) Len() int {
return len(ph)
}

func (ph peerMetricHeap) Less(i, j int) bool {
return 1 == ph[i].metric.Cmp(ph[j].metric)
}

func (ph peerMetricHeap) Swap(i, j int) {
ph[i], ph[j] = ph[j], ph[i]
}

func (ph *peerMetricHeap) Push(x interface{}) {
item := x.(*peerMetric)
*ph = append(*ph, item)
}

func (ph *peerMetricHeap) Pop() interface{} {
old := *ph
n := len(old)
item := old[n-1]
*ph = old[0 : n-1]
return item
}

// KPeerSet implements heap.Interface and PeerQueue
type KPeerSet struct {
kvalue int

// from is the Key this PQ measures against
from ks.Key

// heap is a heap of peerDistance items
heap peerMetricHeap

lock sync.RWMutex
}

func (pq *KPeerSet) Len() int {
pq.lock.RLock()
defer pq.lock.RUnlock()

return len(pq.heap)
}

func (pq *KPeerSet) Check(p peer.ID) bool {
pq.lock.RLock()
defer pq.lock.RUnlock()

if pq.heap.Len() < pq.kvalue {
return true
}

distance := ks.XORKeySpace.Key([]byte(p)).Distance(pq.from)
return distance.Cmp(pq.heap[0].metric) != -1
}

func (pq *KPeerSet) Add(p peer.ID) bool {
pq.lock.Lock()
defer pq.lock.Unlock()

distance := ks.XORKeySpace.Key([]byte(p)).Distance(pq.from)

if pq.heap.Len() >= pq.kvalue {
// If we're not closer than the worst peer, drop this.
if distance.Cmp(pq.heap[0].metric) != -1 {
return false
}
// Replacing something, remove it.
heap.Pop(&pq.heap)
}

heap.Push(&pq.heap, &peerMetric{
peer: p,
metric: distance,
})
return true
}

func (pq *KPeerSet) Peers() []peer.ID {
pq.lock.RLock()
defer pq.lock.RUnlock()

ret := make([]peer.ID, len(pq.heap))
for _, pm := range pq.heap {
ret = append(ret, pm.peer)
}
return ret
}

func New(kvalue int, from string) *KPeerSet {
return &KPeerSet{
from: ks.XORKeySpace.Key([]byte(from)),
kvalue: kvalue,
heap: make([]*peerMetric, 0, kvalue),
}
}
4 changes: 2 additions & 2 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
logger.Debugf("closestPeers query run error: %s", err)
}

if res != nil && res.queriedSet != nil {
sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key))
if res != nil && len(res.finalSet) > 0 {
sorted := kb.SortClosestPeers(res.finalSet, kb.ConvertKey(key))
if len(sorted) > KValue {
sorted = sorted[:KValue]
}
Expand Down
7 changes: 7 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@
"hash": "QmW1UQYrsBotoorzgBcQ3HVj5RzZUzB6g1vQwnNfUuCVdN",
"name": "xerrors",
"version": "0.0.2"
},
{
"author": "whyrusleeping",
"hash": "QmUusaX99BZoELh7dmPgirqRQ1FAmMnmnBn3oiqDFGBUSc",
"name": "go-keyspace",
"version": "1.0.0"
}
],
"gxVersion": "0.4.0",
Expand All @@ -180,3 +186,4 @@
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "4.4.32"
}

39 changes: 27 additions & 12 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"

kpeerset "github.com/libp2p/go-libp2p-kad-dht/kpeerset"

u "github.com/ipfs/go-ipfs-util"
logging "github.com/ipfs/go-log"
todoctr "github.com/ipfs/go-todocounter"
Expand Down Expand Up @@ -34,8 +36,7 @@ type dhtQueryResult struct {
closerPeers []*pstore.PeerInfo // *
success bool

finalSet *pset.PeerSet
queriedSet *pset.PeerSet
finalSet []peer.ID
}

// constructs query
Expand Down Expand Up @@ -71,12 +72,12 @@ func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, e
}

type dhtQueryRunner struct {
query *dhtQuery // query to run
peersSeen *pset.PeerSet // all peers queried. prevent querying same peer 2x
peersQueried *pset.PeerSet // peers successfully connected to and queried
peersDialed *dialQueue // peers we have dialed to
peersToQuery *queue.ChanQueue // peers remaining to be queried
peersRemaining todoctr.Counter // peersToQuery + currently processing
query *dhtQuery // query to run
peersSeen *pset.PeerSet // all peers queried. prevent querying same peer 2x
kPeers *kpeerset.KPeerSet // k best peers queried.
peersDialed *dialQueue // peers we have dialed to
peersToQuery *queue.ChanQueue // peers remaining to be queried
peersRemaining todoctr.Counter // peersToQuery + currently processing

result *dhtQueryResult // query result
errs u.MultiErr // result errors. maybe should be a map[peer.ID]error
Expand All @@ -98,7 +99,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
query: q,
peersRemaining: todoctr.NewSyncCounter(),
peersSeen: pset.New(),
peersQueried: pset.New(),
kPeers: kpeerset.New(KValue, q.key),
rateLimit: make(chan struct{}, q.concurrency),
peersToQuery: peersToQuery,
proc: proc,
Expand Down Expand Up @@ -176,8 +177,7 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes
}

return &dhtQueryResult{
finalSet: r.peersSeen,
queriedSet: r.peersQueried,
finalSet: r.kPeers.Peers(),
}, err
}

Expand All @@ -192,6 +192,11 @@ func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
return
}

if !r.kPeers.Check(next) {
logger.Error("skipping peer before dialing")
return
}

notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.AddingPeer,
ID: next,
Expand Down Expand Up @@ -236,6 +241,11 @@ func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
}

func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error {
if !r.kPeers.Check(p) {
// Don't bother with this peer. We'll skip it in the query phase as well.
return nil
}

// short-circuit if we're already connected.
if r.query.dht.host.Network().Connectedness(p) == inet.Connected {
return nil
Expand Down Expand Up @@ -281,13 +291,18 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
r.rateLimit <- struct{}{}
}()

if !r.kPeers.Check(p) {
// Don't bother with this peer.
return
}

// finally, run the query against this peer
res, err := r.query.qfunc(ctx, p)

if err == nil {
// Make sure we only return DHT peers that actually respond to
// the query.
r.peersQueried.Add(p)
r.kPeers.Add(p)
}

if err != nil {
Expand Down

0 comments on commit d187dc4

Please sign in to comment.