diff --git a/benchmarks_test.go b/benchmarks_test.go index caebcb0b..689512ee 100644 --- a/benchmarks_test.go +++ b/benchmarks_test.go @@ -41,25 +41,24 @@ type runStats struct { var benchmarkLog []runStats type bench struct { - name string - nodeCount int + name string + nodeCount int blockCount int - distFn distFunc - fetchFn fetchFunc + distFn distFunc + fetchFn fetchFunc } -var benches = []bench { +var benches = []bench{ // Fetch from two seed nodes that both have all 100 blocks // - request one at a time, in series - bench {"3Nodes-AllToAll-OneAtATime", 3, 100, allToAll, oneAtATime}, + bench{"3Nodes-AllToAll-OneAtATime", 3, 100, allToAll, oneAtATime}, // - request all 100 with a single GetBlocks() call - bench {"3Nodes-AllToAll-BigBatch", 3, 100, allToAll, batchFetchAll}, + bench{"3Nodes-AllToAll-BigBatch", 3, 100, allToAll, batchFetchAll}, // Fetch from two seed nodes, one at a time, where: // - node A has blocks 0 - 74 // - node B has blocks 25 - 99 - bench {"3Nodes-Overlap1-OneAtATime", 3, 100, overlap1, oneAtATime}, - + bench{"3Nodes-Overlap1-OneAtATime", 3, 100, overlap1, oneAtATime}, // Fetch from two seed nodes, where: // - node A has even blocks @@ -68,38 +67,38 @@ var benches = []bench { // - request one at a time, in series // * times out every time potential-threshold reaches 1.0 - bench {"3Nodes-Overlap3-OneAtATime", 3, 100, overlap2, oneAtATime}, + bench{"3Nodes-Overlap3-OneAtATime", 3, 100, overlap2, oneAtATime}, // - request 10 at a time, in series - bench {"3Nodes-Overlap3-BatchBy10", 3, 100, overlap2, batchFetchBy10}, + bench{"3Nodes-Overlap3-BatchBy10", 3, 100, overlap2, batchFetchBy10}, // - request all 100 in parallel as individual GetBlock() calls - bench {"3Nodes-Overlap3-AllConcurrent", 3, 100, overlap2, fetchAllConcurrent}, + bench{"3Nodes-Overlap3-AllConcurrent", 3, 100, overlap2, fetchAllConcurrent}, // - request all 100 with a single GetBlocks() call - bench {"3Nodes-Overlap3-BigBatch", 3, 100, overlap2, batchFetchAll}, + bench{"3Nodes-Overlap3-BigBatch", 3, 100, overlap2, batchFetchAll}, // - request 1, then 10, then 89 blocks (similar to how IPFS would fetch a file) - bench {"3Nodes-Overlap3-UnixfsFetch", 3, 100, overlap2, unixfsFileFetch}, + bench{"3Nodes-Overlap3-UnixfsFetch", 3, 100, overlap2, unixfsFileFetch}, // Fetch from nine seed nodes, all nodes have all blocks // - request one at a time, in series - bench {"10Nodes-AllToAll-OneAtATime", 10, 100, allToAll, oneAtATime}, + bench{"10Nodes-AllToAll-OneAtATime", 10, 100, allToAll, oneAtATime}, // - request 10 at a time, in series - bench {"10Nodes-AllToAll-BatchFetchBy10", 10, 100, allToAll, batchFetchBy10}, + bench{"10Nodes-AllToAll-BatchFetchBy10", 10, 100, allToAll, batchFetchBy10}, // - request all 100 with a single GetBlocks() call - bench {"10Nodes-AllToAll-BigBatch", 10, 100, allToAll, batchFetchAll}, + bench{"10Nodes-AllToAll-BigBatch", 10, 100, allToAll, batchFetchAll}, // - request all 100 in parallel as individual GetBlock() calls - bench {"10Nodes-AllToAll-AllConcurrent", 10, 100, allToAll, fetchAllConcurrent}, + bench{"10Nodes-AllToAll-AllConcurrent", 10, 100, allToAll, fetchAllConcurrent}, // - request 1, then 10, then 89 blocks (similar to how IPFS would fetch a file) - bench {"10Nodes-AllToAll-UnixfsFetch", 10, 100, allToAll, unixfsFileFetch}, + bench{"10Nodes-AllToAll-UnixfsFetch", 10, 100, allToAll, unixfsFileFetch}, // Fetch from nine seed nodes, blocks are distributed randomly across all nodes (no dups) // - request one at a time, in series - bench {"10Nodes-OnePeerPerBlock-OneAtATime", 10, 100, onePeerPerBlock, oneAtATime}, + bench{"10Nodes-OnePeerPerBlock-OneAtATime", 10, 100, onePeerPerBlock, oneAtATime}, // - request all 100 with a single GetBlocks() call - bench {"10Nodes-OnePeerPerBlock-BigBatch", 10, 100, onePeerPerBlock, batchFetchAll}, + bench{"10Nodes-OnePeerPerBlock-BigBatch", 10, 100, onePeerPerBlock, batchFetchAll}, // - request 1, then 10, then 89 blocks (similar to how IPFS would fetch a file) - bench {"10Nodes-OnePeerPerBlock-UnixfsFetch", 10, 100, onePeerPerBlock, unixfsFileFetch}, + bench{"10Nodes-OnePeerPerBlock-UnixfsFetch", 10, 100, onePeerPerBlock, unixfsFileFetch}, // Fetch from 19 seed nodes, all nodes have all blocks, fetch all 200 blocks with a single GetBlocks() call - bench {"200Nodes-AllToAll-BigBatch", 200, 20, allToAll, batchFetchAll}, + bench{"200Nodes-AllToAll-BigBatch", 200, 20, allToAll, batchFetchAll}, } func BenchmarkDups2Nodes(b *testing.B) { @@ -118,7 +117,7 @@ func BenchmarkDups2Nodes(b *testing.B) { } func printResults(rs []runStats) { - nameOrder := make([]string, 0, 0) + nameOrder := make([]string, 0) names := make(map[string]struct{}) for i := 0; i < len(rs); i++ { if _, ok := names[rs[i].Name]; !ok { @@ -147,10 +146,10 @@ func printResults(rs []runStats) { rcvd /= float64(count) dups /= float64(count) - label := fmt.Sprintf("%s (%d runs / %.2fs):", name, count, elpd / 1000000000.0) + label := fmt.Sprintf("%s (%d runs / %.2fs):", name, count, elpd/1000000000.0) fmt.Printf("%-75s %s / sent %d / recv %d / dups %d\n", label, - fmtDuration(time.Duration(int64(math.Round(elpd / float64(count))))), + fmtDuration(time.Duration(int64(math.Round(elpd/float64(count))))), int64(math.Round(sent)), int64(math.Round(rcvd)), int64(math.Round(dups))) } @@ -158,11 +157,11 @@ func printResults(rs []runStats) { } func fmtDuration(d time.Duration) string { - d = d.Round(time.Millisecond) - s := d / time.Second - d -= s * time.Second - ms := d / time.Millisecond - return fmt.Sprintf("%d.%03ds", s, ms) + d = d.Round(time.Millisecond) + s := d / time.Second + d -= s * time.Second + ms := d / time.Millisecond + return fmt.Sprintf("%d.%03ds", s, ms) } const fastSpeed = 60 * time.Millisecond diff --git a/decision/engine.go b/decision/engine.go index e74f6f22..0c602912 100644 --- a/decision/engine.go +++ b/decision/engine.go @@ -9,6 +9,8 @@ import ( "github.com/google/uuid" bsmsg "github.com/ipfs/go-bitswap/message" + pb "github.com/ipfs/go-bitswap/message/pb" + wantlist "github.com/ipfs/go-bitswap/wantlist" wl "github.com/ipfs/go-bitswap/wantlist" cid "github.com/ipfs/go-cid" bstore "github.com/ipfs/go-ipfs-blockstore" @@ -16,8 +18,6 @@ import ( "github.com/ipfs/go-peertaskqueue" "github.com/ipfs/go-peertaskqueue/peertask" peer "github.com/libp2p/go-libp2p-core/peer" - pb "github.com/ipfs/go-bitswap/message/pb" - wantlist "github.com/ipfs/go-bitswap/wantlist" ) // TODO consider taking responsibility for other types of requests. For @@ -212,9 +212,9 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { entry, ok := l.WantListContains(c) l.lk.Unlock() -// if ok { -// log.Debugf("wantlist has %s: WantHave %t / SendDontHave: %t", c.String()[2:8], entry.WantHave, entry.SendDontHave) -// } + // if ok { + // log.Debugf("wantlist has %s: WantHave %t / SendDontHave: %t", c.String()[2:8], entry.WantHave, entry.SendDontHave) + // } // If the remote peer wants HAVE or DONT_HAVE messages has := true if ok && (entry.WantType == wantlist.WantType_Have || entry.SendDontHave) { @@ -226,7 +226,7 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { // If we have the block, and the remote peer asked for a HAVE if has && entry.WantType == wantlist.WantType_Have { -// log.Debugf("%s: Have", c.String()[2:8]) + // log.Debugf("%s: Have", c.String()[2:8]) msg.AddHave(c) } // If we don't have the block, and the remote peer asked for a DONT_HAVE diff --git a/message/message.go b/message/message.go index 545dbf30..5ac1297b 100644 --- a/message/message.go +++ b/message/message.go @@ -207,9 +207,9 @@ func (m *impl) addEntry(c cid.Cid, priority int, cancel bool, wantType wantlist. } else { m.wantlist[c] = &Entry{ Entry: wantlist.Entry{ - Cid: c, - Priority: priority, - WantType: wantType, + Cid: c, + Priority: priority, + WantType: wantType, SendDontHave: sendDontHave, }, Cancel: cancel, @@ -261,10 +261,10 @@ func (m *impl) ToProtoV0() *pb.Message { pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist)) for _, e := range m.wantlist { pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, pb.Message_Wantlist_Entry{ - Block: e.Cid.Bytes(), - Priority: int32(e.Priority), - Cancel: e.Cancel, - WantType: go2pb(e.WantType), + Block: e.Cid.Bytes(), + Priority: int32(e.Priority), + Cancel: e.Cancel, + WantType: go2pb(e.WantType), SendDontHave: e.SendDontHave, }) } @@ -283,10 +283,10 @@ func (m *impl) ToProtoV1() *pb.Message { pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist)) for _, e := range m.wantlist { pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, pb.Message_Wantlist_Entry{ - Block: e.Cid.Bytes(), - Priority: int32(e.Priority), - Cancel: e.Cancel, - WantType: go2pb(e.WantType), + Block: e.Cid.Bytes(), + Priority: int32(e.Priority), + Cancel: e.Cancel, + WantType: go2pb(e.WantType), SendDontHave: e.SendDontHave, }) } diff --git a/peerbroker/peerbroker.go b/peerbroker/peerbroker.go index ac191944..3cd6b711 100644 --- a/peerbroker/peerbroker.go +++ b/peerbroker/peerbroker.go @@ -4,13 +4,13 @@ import ( "context" "fmt" - logging "github.com/ipfs/go-log" + // logging "github.com/ipfs/go-log" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" ) -var log = logging.Logger("bitswap") +// var log = logging.Logger("bitswap") type WantSource interface { MatchWantPeer([]peer.ID) *Want @@ -31,10 +31,10 @@ type message interface { } type Want struct { - Cid cid.Cid + Cid cid.Cid WantHaves []cid.Cid - Peer peer.ID - Ses uint64 + Peer peer.ID + Ses uint64 } type PeerBroker struct { @@ -126,6 +126,7 @@ func (pb *PeerBroker) run() { } const peerBatchSize = 16 + func (pb *PeerBroker) checkMatch() { batches := make(map[uint64]map[peer.ID][]Want) diff --git a/session/session.go b/session/session.go index b06a517e..e5939703 100644 --- a/session/session.go +++ b/session/session.go @@ -51,9 +51,9 @@ type RequestSplitter interface { } type rcvFrom struct { - from peer.ID - ks []cid.Cid - haves []cid.Cid + from peer.ID + ks []cid.Cid + haves []cid.Cid dontHaves []cid.Cid } @@ -67,8 +67,8 @@ type Session struct { pm PeerManager srs RequestSplitter - sw sessionWants - pb *bspb.PeerBroker + sw sessionWants + pb *bspb.PeerBroker peers *peer.Set // channels @@ -151,8 +151,8 @@ func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontH size := s.peers.Size() s.peers.Add(from) newSize := s.peers.Size() - if (newSize > size) { - log.Infof("Ses%d: Added %d new peers to session: %d peers\n", s.id, newSize - size, newSize) + if newSize > size { + log.Infof("Ses%d: Added %d new peers to session: %d peers\n", s.id, newSize-size, newSize) } } @@ -198,10 +198,10 @@ func (s *Session) MatchWantPeer(ps []peer.ID) *bspb.Want { s.pm.RecordPeerRequests([]peer.ID{p}, []cid.Cid{c}) return &bspb.Want{ - Cid: c, + Cid: c, WantHaves: wh, - Peer: p, - Ses: s.id, + Peer: p, + Ses: s.id, } } @@ -371,7 +371,7 @@ func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) { // If we didn't get any blocks, but we did get some HAVEs, we must have // discovered at least one peer by now, so signal the PeerBroker to // ask us if we have wants - s.pb.WantAvailable() + s.pb.WantAvailable() } } @@ -391,7 +391,7 @@ func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) { s.recentWasUnique = s.recentWasUnique[poplen:] } // fmt.Println("recentWasUnique", s.recentWasUnique) - if (len(s.recentWasUnique) > 16) { + if len(s.recentWasUnique) > 16 { unqCount := 1 dupCount := 1 for _, u := range s.recentWasUnique { @@ -443,11 +443,11 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) { // peers := s.pm.GetOptimizedPeers() // if len(peers) > 0 { - // splitRequests := s.srs.SplitRequest(peers, ks) - // for _, splitRequest := range splitRequests { - // s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys) - // s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id) - // } + // splitRequests := s.srs.SplitRequest(peers, ks) + // for _, splitRequest := range splitRequests { + // s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys) + // s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id) + // } if s.peers.Size() > 0 { log.Infof("Ses%d: WantAvailable()\n", s.id) s.pb.WantAvailable() diff --git a/session/sessionwants.go b/session/sessionwants.go index 31bf5a7e..e0f2b5c5 100644 --- a/session/sessionwants.go +++ b/session/sessionwants.go @@ -1,8 +1,8 @@ package session import ( - "math/rand" "fmt" + "math/rand" "sort" "sync" "time" @@ -17,8 +17,8 @@ type sessionWants struct { liveWants map[cid.Cid]time.Time pastWants *cid.Set - wantPotential map[cid.Cid]map[peer.ID]float64 - wantPeers map[cid.Cid][]peer.ID + wantPotential map[cid.Cid]map[peer.ID]float64 + wantPeers map[cid.Cid][]peer.ID potentialThreshold float64 blockPresence map[cid.Cid]map[peer.ID]bool @@ -26,12 +26,12 @@ type sessionWants struct { func newSessionWants() sessionWants { return sessionWants{ - liveWants: make(map[cid.Cid]time.Time), - toFetch: newCidQueue(), - pastWants: cid.NewSet(), - wantPotential: make(map[cid.Cid]map[peer.ID]float64), - wantPeers: make(map[cid.Cid][]peer.ID), - blockPresence: make(map[cid.Cid]map[peer.ID]bool), + liveWants: make(map[cid.Cid]time.Time), + toFetch: newCidQueue(), + pastWants: cid.NewSet(), + wantPotential: make(map[cid.Cid]map[peer.ID]float64), + wantPeers: make(map[cid.Cid][]peer.ID), + blockPresence: make(map[cid.Cid]map[peer.ID]bool), potentialThreshold: 1.5, } } @@ -69,7 +69,7 @@ func (sw *sessionWants) DecreasePotentialThreshold() { // measures latency. It returns the CIDs of blocks that were actually wanted // (as opposed to duplicates) and the total latency for all incoming blocks. func (sw *sessionWants) BlocksReceived(cids []cid.Cid) ([]cid.Cid, time.Duration) { -// fmt.Println(" ", sw.Stats()) + // fmt.Println(" ", sw.Stats()) now := time.Now() sw.Lock() @@ -97,7 +97,7 @@ func (sw *sessionWants) BlocksReceived(cids []cid.Cid) ([]cid.Cid, time.Duration sw.clearWantSentToPeer(c) } } -// fmt.Println(" ", sw.Stats()) + // fmt.Println(" ", sw.Stats()) return wanted, totalLatency } @@ -138,7 +138,7 @@ func (sw *sessionWants) PrepareBroadcast() []cid.Cid { defer sw.Unlock() // live := make([]cid.Cid, 0, len(sw.liveWants)) - live := make([]cid.Cid, 0, sw.toFetch.Len() + len(sw.liveWants)) + live := make([]cid.Cid, 0, sw.toFetch.Len()+len(sw.liveWants)) for _, c := range sw.toFetch.Cids() { live = append(live, c) sw.liveWants[c] = now @@ -271,23 +271,23 @@ func (sw *sessionWants) wasWantSentToPeer(c cid.Cid, p peer.ID) bool { } type potentialGain struct { - cid cid.Cid + cid cid.Cid potential float64 - maxGain float64 - peer peer.ID + maxGain float64 + peer peer.ID } func (pg potentialGain) String() string { return fmt.Sprintf("%s potential %f. Gain for %s: %f", pg.cid.String()[2:8], pg.potential, pg.peer, pg.maxGain) } -func potentialGainLess(pgs []potentialGain) func (i, j int) bool { +func potentialGainLess(pgs []potentialGain) func(i, j int) bool { indices := make(map[cid.Cid]int, len(pgs)) for i, pg := range pgs { indices[pg.cid] = i } - return func (i, j int) bool { + return func(i, j int) bool { // Sort by max gain, highest to lowest if pgs[i].maxGain > pgs[j].maxGain { return true @@ -315,14 +315,14 @@ func potentialGainLess(pgs []potentialGain) func (i, j int) bool { } func (sw *sessionWants) getBestPotentialLiveWant(peers []peer.ID) (cid.Cid, peer.ID, float64) { -// fmt.Printf("getBestPotentialLiveWant(%s)\n", peers) + // fmt.Printf("getBestPotentialLiveWant(%s)\n", peers) bestC := cid.Cid{} bestP := peer.ID("") bestPotential := -1.0 // Work out the best peer to send each want to, and how big a potential // would be gained - pgs := make([]potentialGain, 0, sw.toFetch.Len() + len(sw.liveWants)) + pgs := make([]potentialGain, 0, sw.toFetch.Len()+len(sw.liveWants)) for c := range sw.liveWants { potential := sw.sumWantPotential(c) if potential < sw.potentialThreshold { @@ -374,7 +374,7 @@ func (sw *sessionWants) getBestPotentialLiveWant(peers []peer.ID) (cid.Cid, peer // Amount to increase potential by if we received a HAVE message const rcvdHavePotentialGain = 0.8 -func (sw * sessionWants) getPotentialGain(c cid.Cid, peers []peer.ID) potentialGain { +func (sw *sessionWants) getPotentialGain(c cid.Cid, peers []peer.ID) potentialGain { maxGain := -1.0 var maxPeer peer.ID @@ -408,7 +408,7 @@ func (sw * sessionWants) getPotentialGain(c cid.Cid, peers []peer.ID) potentialG } potential := sw.sumWantPotential(c) - return potentialGain { c, potential, maxGain, maxPeer } + return potentialGain{c, potential, maxGain, maxPeer} } func (sw *sessionWants) notInWantPeers(cid cid.Cid, peers []peer.ID) []peer.ID { diff --git a/testnet/virtual.go b/testnet/virtual.go index 5f4981d9..8421c2db 100644 --- a/testnet/virtual.go +++ b/testnet/virtual.go @@ -270,7 +270,7 @@ func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error { tag := tagForPeers(nc.local, p) if _, ok := nc.network.conns[tag]; ok { nc.network.mu.Unlock() - // log.Warning("ALREADY CONNECTED TO PEER (is this a reconnect? test lib needs fixing)") + log.Warning("ALREADY CONNECTED TO PEER (is this a reconnect? test lib needs fixing)") return nil } nc.network.conns[tag] = struct{}{} diff --git a/wantlist/wantlist.go b/wantlist/wantlist.go index 21521c74..ae4601a1 100644 --- a/wantlist/wantlist.go +++ b/wantlist/wantlist.go @@ -20,16 +20,17 @@ type Wantlist struct { } type WantTypeT bool + const ( WantType_Block = false - WantType_Have = true + WantType_Have = true ) // Entry is an entry in a want list, consisting of a cid and its priority type Entry struct { - Cid cid.Cid - Priority int - WantType WantTypeT + Cid cid.Cid + Priority int + WantType WantTypeT SendDontHave bool } @@ -41,9 +42,9 @@ type sessionTrackedEntry struct { // NewRefEntry creates a new reference tracked wantlist entry. func NewRefEntry(c cid.Cid, p int, wantType WantTypeT, sendDontHave bool) Entry { return Entry{ - Cid: c, - Priority: p, - WantType: wantType, + Cid: c, + Priority: p, + WantType: wantType, SendDontHave: sendDontHave, } } @@ -209,9 +210,9 @@ func (w *Wantlist) Add(c cid.Cid, priority int, wantType WantTypeT, sendDontHave } w.set[c] = Entry{ - Cid: c, - Priority: priority, - WantType: wantType, + Cid: c, + Priority: priority, + WantType: wantType, SendDontHave: sendDontHave, } diff --git a/wantmanager/wantmanager.go b/wantmanager/wantmanager.go index 69a1ef86..cb1f39b8 100644 --- a/wantmanager/wantmanager.go +++ b/wantmanager/wantmanager.go @@ -191,13 +191,13 @@ func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, wantHaves [ for i, k := range ks { entries = append(entries, bsmsg.Entry{ Cancel: cancel, - Entry: wantlist.NewRefEntry(k, maxPriority-i, false, sendDontHave), + Entry: wantlist.NewRefEntry(k, maxPriority-i, false, sendDontHave), }) } for i, k := range wantHaves { entries = append(entries, bsmsg.Entry{ Cancel: cancel, - Entry: wantlist.NewRefEntry(k, maxPriority-i, true, sendDontHave), + Entry: wantlist.NewRefEntry(k, maxPriority-i, true, sendDontHave), }) } select { @@ -277,6 +277,7 @@ type availablePeersMessage struct { } const maxLiveWantsPerPeer = 1024 + func (apm *availablePeersMessage) handle(wm *WantManager) { // Very simple rate-limit on peers // TODO: get this from the want list instead of maintaining diff --git a/workers.go b/workers.go index 3ddca37c..047d4f29 100644 --- a/workers.go +++ b/workers.go @@ -6,11 +6,11 @@ import ( engine "github.com/ipfs/go-bitswap/decision" bsmsg "github.com/ipfs/go-bitswap/message" + pb "github.com/ipfs/go-bitswap/message/pb" cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" process "github.com/jbenet/goprocess" procctx "github.com/jbenet/goprocess/context" - pb "github.com/ipfs/go-bitswap/message/pb" ) // TaskWorkerCount is the total number of simultaneous threads sending