From 5aa87d9ddacce7b156627ee130a03ad9c9d7a24c Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 8 Jul 2019 16:04:44 -0700 Subject: [PATCH] feat(sessionrequestsplitter): psuedo-randomization Move requests around between peers by a randomization algorithm so requests get spread out --- sessiondata/sessiondata.go | 4 + sessionpeermanager/peerdata.go | 2 +- sessionpeermanager/sessionpeermanager.go | 5 +- sessionpeermanager/sessionpeermanager_test.go | 11 +- .../peersFromOptimizedPeers.go | 101 ++++++++++++++++++ .../peersFromOptimizedPeers_test.go | 68 ++++++++++++ .../sessionrequestsplitter.go | 7 +- 7 files changed, 184 insertions(+), 14 deletions(-) create mode 100644 sessionrequestsplitter/peersFromOptimizedPeers.go create mode 100644 sessionrequestsplitter/peersFromOptimizedPeers_test.go diff --git a/sessiondata/sessiondata.go b/sessiondata/sessiondata.go index a56f93be..ce38b433 100644 --- a/sessiondata/sessiondata.go +++ b/sessiondata/sessiondata.go @@ -5,6 +5,10 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" ) +// MaxOptimizedPeers is the maximum number of peers that the session peer +// manager will send to the splitter +const MaxOptimizedPeers = 32 + // OptimizedPeer describes a peer and its level of optimization from 0 to 1. type OptimizedPeer struct { Peer peer.ID diff --git a/sessionpeermanager/peerdata.go b/sessionpeermanager/peerdata.go index a0619858..eb9f7eda 100644 --- a/sessionpeermanager/peerdata.go +++ b/sessionpeermanager/peerdata.go @@ -7,7 +7,7 @@ import ( ) const ( - newLatencyWeight = 0.5 + newLatencyWeight = 0.3 ) type peerData struct { diff --git a/sessionpeermanager/sessionpeermanager.go b/sessionpeermanager/sessionpeermanager.go index 471e982e..0ea423fb 100644 --- a/sessionpeermanager/sessionpeermanager.go +++ b/sessionpeermanager/sessionpeermanager.go @@ -15,7 +15,6 @@ import ( const ( defaultTimeoutDuration = 5 * time.Second - maxOptimizedPeers = 32 unoptimizedTagValue = 5 // tag value for "unoptimized" session peers. optimizedTagValue = 10 // tag value for "optimized" session peers. ) @@ -279,8 +278,8 @@ type getPeersMessage struct { func (prm *getPeersMessage) handle(spm *SessionPeerManager) { randomOrder := rand.Perm(len(spm.unoptimizedPeersArr)) maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr) - if maxPeers > maxOptimizedPeers { - maxPeers = maxOptimizedPeers + if maxPeers > bssd.MaxOptimizedPeers { + maxPeers = bssd.MaxOptimizedPeers } var bestPeerLatency float64 if len(spm.optimizedPeersArr) > 0 { diff --git a/sessionpeermanager/sessionpeermanager_test.go b/sessionpeermanager/sessionpeermanager_test.go index c0d6512b..6640a597 100644 --- a/sessionpeermanager/sessionpeermanager_test.go +++ b/sessionpeermanager/sessionpeermanager_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + bssd "github.com/ipfs/go-bitswap/sessiondata" "github.com/ipfs/go-bitswap/testutil" cid "github.com/ipfs/go-cid" @@ -182,7 +183,7 @@ func TestOrderingPeers(t *testing.T) { sessionPeerManager.RecordPeerResponse(peer3, c[0]) sessionPeers := sessionPeerManager.GetOptimizedPeers() - if len(sessionPeers) != maxOptimizedPeers { + if len(sessionPeers) != bssd.MaxOptimizedPeers { t.Fatal("Should not return more than the max of optimized peers") } @@ -203,7 +204,7 @@ func TestOrderingPeers(t *testing.T) { } // should other peers rating of zero - for i := 3; i < maxOptimizedPeers; i++ { + for i := 3; i < bssd.MaxOptimizedPeers; i++ { if sessionPeers[i].OptimizationRating != 0.0 { t.Fatal("Did not assign rating to unoptimized peer correctly") } @@ -219,7 +220,7 @@ func TestOrderingPeers(t *testing.T) { // call again nextSessionPeers := sessionPeerManager.GetOptimizedPeers() - if len(nextSessionPeers) != maxOptimizedPeers { + if len(nextSessionPeers) != bssd.MaxOptimizedPeers { t.Fatal("Should not return more than the max of optimized peers") } @@ -231,12 +232,12 @@ func TestOrderingPeers(t *testing.T) { // should randomize other peers totalSame := 0 - for i := 3; i < maxOptimizedPeers; i++ { + for i := 3; i < bssd.MaxOptimizedPeers; i++ { if sessionPeers[i].Peer == nextSessionPeers[i].Peer { totalSame++ } } - if totalSame >= maxOptimizedPeers-3 { + if totalSame >= bssd.MaxOptimizedPeers-3 { t.Fatal("should not return the same random peers each time") } } diff --git a/sessionrequestsplitter/peersFromOptimizedPeers.go b/sessionrequestsplitter/peersFromOptimizedPeers.go new file mode 100644 index 00000000..b8132041 --- /dev/null +++ b/sessionrequestsplitter/peersFromOptimizedPeers.go @@ -0,0 +1,101 @@ +package sessionrequestsplitter + +import ( + "math/rand" + "sync" + + bssd "github.com/ipfs/go-bitswap/sessiondata" + "github.com/libp2p/go-libp2p-core/peer" +) + +type skipMap struct { + skips [bssd.MaxOptimizedPeers]int + start int + previousIndex int + currentIndex int +} + +var skipMapPool = sync.Pool{ + New: func() interface{} { + return new(skipMap) + }, +} + +func newSkipMap(length int) *skipMap { + sm := skipMapPool.Get().(*skipMap) + sm.Reset(length) + return sm +} + +func returnSkipMap(sm *skipMap) { + skipMapPool.Put(sm) +} + +func (sm *skipMap) Reset(length int) { + for i := 0; i < length; i++ { + sm.skips[i] = i + 1 + } + sm.start = 0 + sm.currentIndex = 0 + sm.previousIndex = -1 +} + +func (sm *skipMap) BeginTraverse() { + sm.currentIndex = sm.start + sm.previousIndex = -1 +} + +func (sm *skipMap) Advance() { + sm.previousIndex = sm.currentIndex + sm.currentIndex = sm.skips[sm.currentIndex] +} + +func (sm *skipMap) RemoveCurrent() { + if sm.currentIndex == sm.start { + sm.start = sm.skips[sm.currentIndex] + } else { + sm.skips[sm.previousIndex] = sm.skips[sm.currentIndex] + } +} + +func (sm *skipMap) CurrentIndex() int { + return sm.currentIndex +} + +func transformOptimization(optimizationRating float64) float64 { + return optimizationRating * optimizationRating +} + +func getOptimizationTotal(optimizedPeers []bssd.OptimizedPeer) float64 { + optimizationTotal := 0.0 + for _, optimizedPeer := range optimizedPeers { + optimizationTotal += transformOptimization(optimizedPeer.OptimizationRating) + } + return optimizationTotal +} + +func peersFromOptimizedPeers(optimizedPeers []bssd.OptimizedPeer) []peer.ID { + optimizationTotal := getOptimizationTotal(optimizedPeers) + sm := newSkipMap(len(optimizedPeers)) + peers := make([]peer.ID, 0, len(optimizedPeers)) + for range optimizedPeers { + randValue := rand.Float64() + randTarget := randValue * optimizationTotal + targetSoFar := 0.0 + sm.BeginTraverse() + for { + currentPeer := optimizedPeers[sm.CurrentIndex()] + currentRating := transformOptimization(currentPeer.OptimizationRating) + targetSoFar += currentRating + if targetSoFar+0.000001 >= randTarget { + peers = append(peers, currentPeer.Peer) + optimizationTotal -= currentRating + sm.RemoveCurrent() + break + } + sm.Advance() + } + } + returnSkipMap(sm) + return peers +} diff --git a/sessionrequestsplitter/peersFromOptimizedPeers_test.go b/sessionrequestsplitter/peersFromOptimizedPeers_test.go new file mode 100644 index 00000000..309d6b68 --- /dev/null +++ b/sessionrequestsplitter/peersFromOptimizedPeers_test.go @@ -0,0 +1,68 @@ +package sessionrequestsplitter + +import ( + "fmt" + "math" + "testing" + + bssd "github.com/ipfs/go-bitswap/sessiondata" + "github.com/ipfs/go-bitswap/testutil" + "github.com/libp2p/go-libp2p-core/peer" +) + +func sampleDistributions(optimizedPeers []bssd.OptimizedPeer, sampleSize int) []map[peer.ID]float64 { + sampleCounts := make([]map[peer.ID]int, len(optimizedPeers)) + for i := range optimizedPeers { + sampleCounts[i] = make(map[peer.ID]int) + } + for i := 0; i < sampleSize; i++ { + samplePeers := peersFromOptimizedPeers(optimizedPeers) + for j, peer := range samplePeers { + sampleCounts[j][peer] = sampleCounts[j][peer] + 1 + } + } + sampleMean := make([]map[peer.ID]float64, len(optimizedPeers)) + for i, sampleCount := range sampleCounts { + sampleMean[i] = make(map[peer.ID]float64) + for p, count := range sampleCount { + sampleMean[i][p] = float64(count) / float64(sampleSize) + } + } + return sampleMean +} + +func TestDistributionsFromOptimizations(t *testing.T) { + peers := testutil.GeneratePeers(3) + optimizedPeers := make([]bssd.OptimizedPeer, 0, len(peers)) + for i, peer := range peers { + optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{peer, 1.0 - (float64(i) * 0.4)}) + } + distributions := sampleDistributions(optimizedPeers, 1000) + expectedDistributions := []map[peer.ID]float64{ + map[peer.ID]float64{ + peers[0]: 1.0 / 1.4, + peers[1]: 0.36 / 1.4, + peers[2]: 0.04 / 1.4, + }, + map[peer.ID]float64{ + peers[0]: (0.36/1.4)*(1.0/1.04) + (0.04/1.4)*(1.0/1.36), + peers[1]: (1.0/1.4)*(0.36/0.4) + (0.04/1.4)*(0.36/1.36), + peers[2]: (1.0/1.4)*(0.04/0.4) + (0.36/1.4)*(0.04/1.04), + }, + map[peer.ID]float64{ + peers[0]: (0.04/1.4)*(0.36/1.36) + (0.36/1.4)*(0.04/1.04), + peers[1]: (0.04/1.4)*(1.0/1.36) + (1.0/1.4)*(0.04/0.4), + peers[2]: (0.36/1.4)*(1.0/1.04) + (1.0/1.4)*(0.36/0.4), + }, + } + for i, distribution := range distributions { + expectedDistribution := expectedDistributions[i] + for p, value := range distribution { + expectedValue := expectedDistribution[p] + fmt.Printf("Value: %f, Expected: %f\n", value, expectedValue) + if math.Abs(value-expectedValue) >= 0.02 { + t.Fatal("Distribution did not match expected distribution") + } + } + } +} diff --git a/sessionrequestsplitter/sessionrequestsplitter.go b/sessionrequestsplitter/sessionrequestsplitter.go index 46998244..17937dc1 100644 --- a/sessionrequestsplitter/sessionrequestsplitter.go +++ b/sessionrequestsplitter/sessionrequestsplitter.go @@ -13,7 +13,7 @@ const ( minReceivedToAdjustSplit = 2 maxSplit = 16 maxAcceptableDupes = 0.4 - minDuplesToTryLessSplits = 0.2 + minDuplesToTryLessSplits = 0.1 initialSplit = 2 ) @@ -105,10 +105,7 @@ type splitRequestMessage struct { func (s *splitRequestMessage) handle(srs *SessionRequestSplitter) { split := srs.split // first iteration ignore optimization ratings - peers := make([]peer.ID, len(s.optimizedPeers)) - for i, optimizedPeer := range s.optimizedPeers { - peers[i] = optimizedPeer.Peer - } + peers := peersFromOptimizedPeers(s.optimizedPeers) ks := s.ks if len(peers) < split { split = len(peers)