Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
feat(sessionrequestsplitter): psuedo-randomization
Browse files Browse the repository at this point in the history
Move requests around between peers by a randomization algorithm so requests get spread out
  • Loading branch information
hannahhoward committed Jul 24, 2019
1 parent 5204f40 commit 5aa87d9
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 14 deletions.
4 changes: 4 additions & 0 deletions sessiondata/sessiondata.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
)

// MaxOptimizedPeers is the maximum number of peers that the session peer
// manager will send to the splitter
const MaxOptimizedPeers = 32

// OptimizedPeer describes a peer and its level of optimization from 0 to 1.
type OptimizedPeer struct {
Peer peer.ID
Expand Down
2 changes: 1 addition & 1 deletion sessionpeermanager/peerdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

const (
newLatencyWeight = 0.5
newLatencyWeight = 0.3
)

type peerData struct {
Expand Down
5 changes: 2 additions & 3 deletions sessionpeermanager/sessionpeermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

const (
defaultTimeoutDuration = 5 * time.Second
maxOptimizedPeers = 32
unoptimizedTagValue = 5 // tag value for "unoptimized" session peers.
optimizedTagValue = 10 // tag value for "optimized" session peers.
)
Expand Down Expand Up @@ -279,8 +278,8 @@ type getPeersMessage struct {
func (prm *getPeersMessage) handle(spm *SessionPeerManager) {
randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
if maxPeers > maxOptimizedPeers {
maxPeers = maxOptimizedPeers
if maxPeers > bssd.MaxOptimizedPeers {
maxPeers = bssd.MaxOptimizedPeers
}
var bestPeerLatency float64
if len(spm.optimizedPeersArr) > 0 {
Expand Down
11 changes: 6 additions & 5 deletions sessionpeermanager/sessionpeermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

bssd "github.com/ipfs/go-bitswap/sessiondata"
"github.com/ipfs/go-bitswap/testutil"

cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -182,7 +183,7 @@ func TestOrderingPeers(t *testing.T) {
sessionPeerManager.RecordPeerResponse(peer3, c[0])

sessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(sessionPeers) != maxOptimizedPeers {
if len(sessionPeers) != bssd.MaxOptimizedPeers {
t.Fatal("Should not return more than the max of optimized peers")
}

Expand All @@ -203,7 +204,7 @@ func TestOrderingPeers(t *testing.T) {
}

// should other peers rating of zero
for i := 3; i < maxOptimizedPeers; i++ {
for i := 3; i < bssd.MaxOptimizedPeers; i++ {
if sessionPeers[i].OptimizationRating != 0.0 {
t.Fatal("Did not assign rating to unoptimized peer correctly")
}
Expand All @@ -219,7 +220,7 @@ func TestOrderingPeers(t *testing.T) {

// call again
nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(nextSessionPeers) != maxOptimizedPeers {
if len(nextSessionPeers) != bssd.MaxOptimizedPeers {
t.Fatal("Should not return more than the max of optimized peers")
}

Expand All @@ -231,12 +232,12 @@ func TestOrderingPeers(t *testing.T) {

// should randomize other peers
totalSame := 0
for i := 3; i < maxOptimizedPeers; i++ {
for i := 3; i < bssd.MaxOptimizedPeers; i++ {
if sessionPeers[i].Peer == nextSessionPeers[i].Peer {
totalSame++
}
}
if totalSame >= maxOptimizedPeers-3 {
if totalSame >= bssd.MaxOptimizedPeers-3 {
t.Fatal("should not return the same random peers each time")
}
}
Expand Down
101 changes: 101 additions & 0 deletions sessionrequestsplitter/peersFromOptimizedPeers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package sessionrequestsplitter

import (
"math/rand"
"sync"

bssd "github.com/ipfs/go-bitswap/sessiondata"
"github.com/libp2p/go-libp2p-core/peer"
)

type skipMap struct {
skips [bssd.MaxOptimizedPeers]int
start int
previousIndex int
currentIndex int
}

var skipMapPool = sync.Pool{
New: func() interface{} {
return new(skipMap)
},
}

func newSkipMap(length int) *skipMap {
sm := skipMapPool.Get().(*skipMap)
sm.Reset(length)
return sm
}

func returnSkipMap(sm *skipMap) {
skipMapPool.Put(sm)
}

func (sm *skipMap) Reset(length int) {
for i := 0; i < length; i++ {
sm.skips[i] = i + 1
}
sm.start = 0
sm.currentIndex = 0
sm.previousIndex = -1
}

func (sm *skipMap) BeginTraverse() {
sm.currentIndex = sm.start
sm.previousIndex = -1
}

func (sm *skipMap) Advance() {
sm.previousIndex = sm.currentIndex
sm.currentIndex = sm.skips[sm.currentIndex]
}

func (sm *skipMap) RemoveCurrent() {
if sm.currentIndex == sm.start {
sm.start = sm.skips[sm.currentIndex]
} else {
sm.skips[sm.previousIndex] = sm.skips[sm.currentIndex]
}
}

func (sm *skipMap) CurrentIndex() int {
return sm.currentIndex
}

func transformOptimization(optimizationRating float64) float64 {
return optimizationRating * optimizationRating
}

func getOptimizationTotal(optimizedPeers []bssd.OptimizedPeer) float64 {
optimizationTotal := 0.0
for _, optimizedPeer := range optimizedPeers {
optimizationTotal += transformOptimization(optimizedPeer.OptimizationRating)
}
return optimizationTotal
}

func peersFromOptimizedPeers(optimizedPeers []bssd.OptimizedPeer) []peer.ID {
optimizationTotal := getOptimizationTotal(optimizedPeers)
sm := newSkipMap(len(optimizedPeers))
peers := make([]peer.ID, 0, len(optimizedPeers))
for range optimizedPeers {
randValue := rand.Float64()
randTarget := randValue * optimizationTotal
targetSoFar := 0.0
sm.BeginTraverse()
for {
currentPeer := optimizedPeers[sm.CurrentIndex()]
currentRating := transformOptimization(currentPeer.OptimizationRating)
targetSoFar += currentRating
if targetSoFar+0.000001 >= randTarget {
peers = append(peers, currentPeer.Peer)
optimizationTotal -= currentRating
sm.RemoveCurrent()
break
}
sm.Advance()
}
}
returnSkipMap(sm)
return peers
}
68 changes: 68 additions & 0 deletions sessionrequestsplitter/peersFromOptimizedPeers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package sessionrequestsplitter

import (
"fmt"
"math"
"testing"

bssd "github.com/ipfs/go-bitswap/sessiondata"
"github.com/ipfs/go-bitswap/testutil"
"github.com/libp2p/go-libp2p-core/peer"
)

func sampleDistributions(optimizedPeers []bssd.OptimizedPeer, sampleSize int) []map[peer.ID]float64 {
sampleCounts := make([]map[peer.ID]int, len(optimizedPeers))
for i := range optimizedPeers {
sampleCounts[i] = make(map[peer.ID]int)
}
for i := 0; i < sampleSize; i++ {
samplePeers := peersFromOptimizedPeers(optimizedPeers)
for j, peer := range samplePeers {
sampleCounts[j][peer] = sampleCounts[j][peer] + 1
}
}
sampleMean := make([]map[peer.ID]float64, len(optimizedPeers))
for i, sampleCount := range sampleCounts {
sampleMean[i] = make(map[peer.ID]float64)
for p, count := range sampleCount {
sampleMean[i][p] = float64(count) / float64(sampleSize)
}
}
return sampleMean
}

func TestDistributionsFromOptimizations(t *testing.T) {
peers := testutil.GeneratePeers(3)
optimizedPeers := make([]bssd.OptimizedPeer, 0, len(peers))
for i, peer := range peers {
optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{peer, 1.0 - (float64(i) * 0.4)})
}
distributions := sampleDistributions(optimizedPeers, 1000)
expectedDistributions := []map[peer.ID]float64{
map[peer.ID]float64{
peers[0]: 1.0 / 1.4,
peers[1]: 0.36 / 1.4,
peers[2]: 0.04 / 1.4,
},
map[peer.ID]float64{
peers[0]: (0.36/1.4)*(1.0/1.04) + (0.04/1.4)*(1.0/1.36),
peers[1]: (1.0/1.4)*(0.36/0.4) + (0.04/1.4)*(0.36/1.36),
peers[2]: (1.0/1.4)*(0.04/0.4) + (0.36/1.4)*(0.04/1.04),
},
map[peer.ID]float64{
peers[0]: (0.04/1.4)*(0.36/1.36) + (0.36/1.4)*(0.04/1.04),
peers[1]: (0.04/1.4)*(1.0/1.36) + (1.0/1.4)*(0.04/0.4),
peers[2]: (0.36/1.4)*(1.0/1.04) + (1.0/1.4)*(0.36/0.4),
},
}
for i, distribution := range distributions {
expectedDistribution := expectedDistributions[i]
for p, value := range distribution {
expectedValue := expectedDistribution[p]
fmt.Printf("Value: %f, Expected: %f\n", value, expectedValue)
if math.Abs(value-expectedValue) >= 0.02 {
t.Fatal("Distribution did not match expected distribution")
}
}
}
}
7 changes: 2 additions & 5 deletions sessionrequestsplitter/sessionrequestsplitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const (
minReceivedToAdjustSplit = 2
maxSplit = 16
maxAcceptableDupes = 0.4
minDuplesToTryLessSplits = 0.2
minDuplesToTryLessSplits = 0.1
initialSplit = 2
)

Expand Down Expand Up @@ -105,10 +105,7 @@ type splitRequestMessage struct {
func (s *splitRequestMessage) handle(srs *SessionRequestSplitter) {
split := srs.split
// first iteration ignore optimization ratings
peers := make([]peer.ID, len(s.optimizedPeers))
for i, optimizedPeer := range s.optimizedPeers {
peers[i] = optimizedPeer.Peer
}
peers := peersFromOptimizedPeers(s.optimizedPeers)
ks := s.ks
if len(peers) < split {
split = len(peers)
Expand Down

0 comments on commit 5aa87d9

Please sign in to comment.