Skip to content

Commit

Permalink
add finish "slop"
Browse files Browse the repository at this point in the history
We try to "finish" with `KValue + slop` peers but only wait for KValue. This
means we don't have to wait for timeouts.
  • Loading branch information
Stebalien committed Mar 12, 2019
1 parent 56d67c1 commit 72782a9
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (

var maxQueryConcurrency = AlphaValue

// We "finish" queries with KValue + finishSlop peers but only wait for KValue
// peers. This helps us account for unreachable peers.
var finishSlop = 4

type dhtQuery struct {
dht *IpfsDHT
key string // the key we're querying for
Expand Down Expand Up @@ -192,9 +196,6 @@ func (r *dhtQueryRecurseResult) Finish(ctx context.Context) ([]peer.ID, error) {
}

func (r *dhtQueryRecurseResult) FinishWith(ctx context.Context, fn func(context.Context, peer.ID) error) ([]peer.ID, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Get a sorted list of peers to query.
failed := make(map[peer.ID]bool, len(r.failed))
for _, p := range r.failed {
Expand All @@ -218,9 +219,10 @@ func (r *dhtQueryRecurseResult) FinishWith(ctx context.Context, fn func(context.
closest = kb.SortClosestPeers(closest, kb.ConvertKey(r.query.key))

// Query them.
sloppyK := KValue + finishSlop
bucket := make([]peer.ID, 0, KValue)
workQ := make(chan peer.ID)
resultQ := make(chan peer.ID, KValue)
resultQ := make(chan peer.ID, sloppyK)

newQuery := fn != nil
if !newQuery {
Expand All @@ -231,8 +233,13 @@ func (r *dhtQueryRecurseResult) FinishWith(ctx context.Context, fn func(context.
}

var wg sync.WaitGroup
wg.Add(KValue)
for i := 0; i < KValue; i++ {
defer wg.Wait()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

wg.Add(sloppyK)
for i := 0; i < sloppyK; i++ {
go func() {
defer wg.Done()
for p := range workQ {
Expand All @@ -243,6 +250,7 @@ func (r *dhtQueryRecurseResult) FinishWith(ctx context.Context, fn func(context.
}
}()
}

go func() {
wg.Wait()
close(resultQ)
Expand All @@ -269,11 +277,17 @@ func (r *dhtQueryRecurseResult) FinishWith(ctx context.Context, fn func(context.
bucket = append(bucket, successPeer)
}
}

close(workQ)

for p := range resultQ {
for len(bucket) < KValue {
p, ok := <-resultQ
if !ok {
break
}
bucket = append(bucket, p)
}

return bucket, ctx.Err()
}

Expand Down

0 comments on commit 72782a9

Please sign in to comment.