Skip to content

Commit

Permalink
peerstore: limit number of non connected peers in addrbook
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Sep 19, 2024
1 parent 9038a72 commit a51ec10
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 51 deletions.
11 changes: 10 additions & 1 deletion core/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
RecentlyConnectedAddrTTL = time.Minute * 15

// OwnObservedAddrTTL is used for our own external addresses observed by peers.
//
// Deprecated: observed addresses are maintained till we disconnect from the peer which provided it
OwnObservedAddrTTL = time.Minute * 30
)
Expand Down Expand Up @@ -65,6 +66,10 @@ type Peerstore interface {

// Peers returns all the peer IDs stored across all inner stores.
Peers() peer.IDSlice

// RemovePeer removes all the peer related information except its addresses. To remove the
// addresses use `AddrBook.ClearAddrs` or set the address ttls to 0.
RemovePeer(peer.ID)
}

// PeerMetadata can handle values of any type. Serializing values is
Expand Down Expand Up @@ -133,13 +138,17 @@ type AddrBook interface {
// cab.ConsumePeerRecord(signedPeerRecord, aTTL)
// }
type CertifiedAddrBook interface {
// A CertifiedAddrBook is an address book. To remove peers, use the available
// methods on the AddrBook.
AddrBook
// ConsumePeerRecord stores a signed peer record and the contained addresses for
// for ttl duration.
// ttl duration.
// The addresses contained in the signed peer record will expire after ttl. If any
// address is already present in the peer store, it'll expire at max of existing ttl and
// provided ttl.
// The signed peer record itself will be expired when all the addresses associated with the peer,
// self-certified or not, are removed from the AddrBook.
//
// To delete the signed peer record, use `AddrBook.UpdateAddrs`,`AddrBook.SetAddrs`, or
// `AddrBook.ClearAddrs` with ttl 0.
// Note: Future calls to ConsumePeerRecord will not expire self-certified addresses from the
Expand Down
150 changes: 107 additions & 43 deletions p2p/host/peerstore/pstoremem/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package pstoremem
import (
"container/heap"
"context"
"errors"
"fmt"
"sort"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"

Check failure on line 13 in p2p/host/peerstore/pstoremem/addr_book.go

View workflow job for this annotation

GitHub Actions / go-check / All

package "github.com/libp2p/go-libp2p/core/peerstore" is being imported more than once (ST1019)
pstore "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/record"

Expand All @@ -23,7 +25,7 @@ type expiringAddr struct {
TTL time.Duration
Expiry time.Time
Peer peer.ID
// to sort by expiry time
// to sort by expiry time, -1 means it's not in the heap
heapIndex int
}

Expand Down Expand Up @@ -61,45 +63,31 @@ func (pa *peerAddrs) Swap(i, j int) {
}
func (pa *peerAddrs) Push(x any) {
a := x.(*expiringAddr)
if _, ok := pa.Addrs[a.Peer]; !ok {
pa.Addrs[a.Peer] = make(map[string]*expiringAddr)
}
pa.Addrs[a.Peer][string(a.Addr.Bytes())] = a
a.heapIndex = len(pa.expiringHeap)
pa.expiringHeap = append(pa.expiringHeap, a)
}
func (pa *peerAddrs) Pop() any {
a := pa.expiringHeap[len(pa.expiringHeap)-1]
a.heapIndex = -1
pa.expiringHeap = pa.expiringHeap[0 : len(pa.expiringHeap)-1]

if m, ok := pa.Addrs[a.Peer]; ok {
delete(m, string(a.Addr.Bytes()))
if len(m) == 0 {
delete(pa.Addrs, a.Peer)
}
}
return a
}

func (pa *peerAddrs) Fix(a *expiringAddr) {
heap.Fix(pa, a.heapIndex)
}

func (pa *peerAddrs) Delete(a *expiringAddr) {
heap.Remove(pa, a.heapIndex)
a.heapIndex = -1
if m, ok := pa.Addrs[a.Peer]; ok {
delete(m, string(a.Addr.Bytes()))
if len(m) == 0 {
if ea, ok := pa.Addrs[a.Peer][string(a.Addr.Bytes())]; ok {
if ea.heapIndex != -1 {
heap.Remove(pa, a.heapIndex)
}
delete(pa.Addrs[a.Peer], string(a.Addr.Bytes()))
if len(pa.Addrs[a.Peer]) == 0 {
delete(pa.Addrs, a.Peer)
}
}
}

func (pa *peerAddrs) FindAddr(p peer.ID, addrBytes ma.Multiaddr) (*expiringAddr, bool) {
func (pa *peerAddrs) FindAddr(p peer.ID, addr ma.Multiaddr) (*expiringAddr, bool) {
if m, ok := pa.Addrs[p]; ok {
v, ok := m[string(addrBytes.Bytes())]
v, ok := m[string(addr.Bytes())]
return v, ok
}
return nil, false
Expand All @@ -116,11 +104,44 @@ func (pa *peerAddrs) PopIfExpired(now time.Time) (*expiringAddr, bool) {
// Use `!Before` instead of `After` to ensure that we expire *at* now, and not *just after now*.
if len(pa.expiringHeap) > 0 && !now.Before(pa.NextExpiry()) {
a := heap.Pop(pa)
ea := a.(*expiringAddr)
delete(pa.Addrs[ea.Peer], string(ea.Addr.Bytes()))
if len(pa.Addrs[ea.Peer]) == 0 {
delete(pa.Addrs, ea.Peer)
}
return a.(*expiringAddr), true
}
return nil, false
}

func (pa *peerAddrs) Update(a *expiringAddr) {
if a.heapIndex == -1 {
return
}
if a.TTL >= peerstore.ConnectedAddrTTL {
heap.Remove(pa, a.heapIndex)
} else {
heap.Fix(pa, a.heapIndex)
}
}

func (pa *peerAddrs) Insert(a *expiringAddr) {
a.heapIndex = -1
if _, ok := pa.Addrs[a.Peer]; !ok {
pa.Addrs[a.Peer] = make(map[string]*expiringAddr)
}
pa.Addrs[a.Peer][string(a.Addr.Bytes())] = a
// don't add permanent addr to heap.
if a.TTL >= peerstore.ConnectedAddrTTL {
return
}
heap.Push(pa, a)
}

func (pa *peerAddrs) NumNonConnectedAddrs() int {
return len(pa.expiringHeap)
}

type clock interface {
Now() time.Time
}
Expand All @@ -131,12 +152,18 @@ func (rc realclock) Now() time.Time {
return time.Now()
}

const (
defaultMaxSignedPeerRecords = 100_000
defaultMaxUnconnectedAddrs = 1_000_000
)

// memoryAddrBook manages addresses.
type memoryAddrBook struct {
mu sync.RWMutex
// TODO bound the number of not connected addresses we store.
addrs peerAddrs
signedPeerRecords map[peer.ID]*peerRecordState
mu sync.RWMutex
addrs peerAddrs
signedPeerRecords map[peer.ID]*peerRecordState
maxUnconnectedAddrs int
maxSignedPeerRecords int

refCount sync.WaitGroup
cancel func()
Expand All @@ -152,11 +179,13 @@ func NewAddrBook() *memoryAddrBook {
ctx, cancel := context.WithCancel(context.Background())

ab := &memoryAddrBook{
addrs: newPeerAddrs(),
signedPeerRecords: make(map[peer.ID]*peerRecordState),
subManager: NewAddrSubManager(),
cancel: cancel,
clock: realclock{},
addrs: newPeerAddrs(),
signedPeerRecords: make(map[peer.ID]*peerRecordState),
subManager: NewAddrSubManager(),
cancel: cancel,
clock: realclock{},
maxUnconnectedAddrs: defaultMaxUnconnectedAddrs,
maxSignedPeerRecords: defaultMaxUnconnectedAddrs,
}
ab.refCount.Add(1)
go ab.background(ctx)
Expand All @@ -172,6 +201,20 @@ func WithClock(clock clock) AddrBookOption {
}
}

func WithMaxAddresses(n int) AddrBookOption {
return func(b *memoryAddrBook) error {
b.maxUnconnectedAddrs = n
return nil
}
}

func WithMaxSignedPeerRecords(n int) AddrBookOption {
return func(b *memoryAddrBook) error {
b.maxSignedPeerRecords = n
return nil
}
}

// background periodically schedules a gc
func (mab *memoryAddrBook) background(ctx context.Context) {
defer mab.refCount.Done()
Expand Down Expand Up @@ -252,6 +295,10 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt
if found && lastState.Seq > rec.Seq {
return false, nil
}
// check if we are over the max signed peer record limit
if !found && len(mab.signedPeerRecords) >= mab.maxSignedPeerRecords {
return false, errors.New("too many signed peer records")
}
mab.signedPeerRecords[rec.PeerID] = &peerRecordState{
Envelope: recordEnvelope,
Seq: rec.Seq,
Expand Down Expand Up @@ -281,6 +328,11 @@ func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl
return
}

// we are over limit, drop these addrs.
if ttl < peerstore.ConnectedAddrTTL && mab.addrs.NumNonConnectedAddrs() >= mab.maxUnconnectedAddrs {
return
}

exp := mab.clock.Now().Add(ttl)
for _, addr := range addrs {
// Remove suffix of /p2p/peer-id from address
Expand All @@ -297,7 +349,7 @@ func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl
if !found {
// not found, announce it.
entry := &expiringAddr{Addr: addr, Expiry: exp, TTL: ttl, Peer: p}
heap.Push(&mab.addrs, entry)
mab.addrs.Insert(entry)
mab.subManager.BroadcastAddr(p, addr)
} else {
// update ttl & exp to whichever is greater between new and existing entry
Expand All @@ -311,7 +363,7 @@ func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl
a.Expiry = exp
}
if changed {
mab.addrs.Fix(a)
mab.addrs.Update(a)
}
}
}
Expand Down Expand Up @@ -344,17 +396,24 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du

if a, found := mab.addrs.FindAddr(p, addr); found {
if ttl > 0 {
a.Addr = addr
a.Expiry = exp
a.TTL = ttl
mab.addrs.Fix(a)
mab.subManager.BroadcastAddr(p, addr)
if a.TTL >= peerstore.ConnectedAddrTTL && ttl < peerstore.ConnectedAddrTTL && mab.addrs.NumNonConnectedAddrs() >= mab.maxUnconnectedAddrs {
mab.addrs.Delete(a)
} else {
a.Addr = addr
a.Expiry = exp
a.TTL = ttl
mab.addrs.Update(a)
mab.subManager.BroadcastAddr(p, addr)
}
} else {
mab.addrs.Delete(a)
}
} else {
if ttl > 0 {
heap.Push(&mab.addrs, &expiringAddr{Addr: addr, Expiry: exp, TTL: ttl, Peer: p})
if ttl < peerstore.ConnectedAddrTTL && mab.addrs.NumNonConnectedAddrs() >= mab.maxUnconnectedAddrs {
continue
}
mab.addrs.Insert(&expiringAddr{Addr: addr, Expiry: exp, TTL: ttl, Peer: p})
mab.subManager.BroadcastAddr(p, addr)
}
}
Expand All @@ -375,9 +434,14 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t
if newTTL == 0 {
mab.addrs.Delete(a)
} else {
a.TTL = newTTL
a.Expiry = exp
mab.addrs.Fix(a)
// We are over limit, drop these addresses.
if oldTTL >= peerstore.ConnectedAddrTTL && newTTL < peerstore.ConnectedAddrTTL && mab.addrs.NumNonConnectedAddrs() >= mab.maxUnconnectedAddrs {
mab.addrs.Delete(a)
} else {
a.TTL = newTTL
a.Expiry = exp
mab.addrs.Update(a)
}
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions p2p/host/peerstore/pstoremem/addr_book_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func TestPeerAddrsNextExpiry(t *testing.T) {
// t1 is before t2
t1 := time.Time{}.Add(1 * time.Second)
t2 := time.Time{}.Add(2 * time.Second)
heap.Push(pa, &expiringAddr{Addr: a1, Expiry: t1, TTL: 10 * time.Second, Peer: "p1"})
heap.Push(pa, &expiringAddr{Addr: a2, Expiry: t2, TTL: 10 * time.Second, Peer: "p2"})
paa.Insert(&expiringAddr{Addr: a1, Expiry: t1, TTL: 10 * time.Second, Peer: "p1"})
paa.Insert(&expiringAddr{Addr: a2, Expiry: t2, TTL: 10 * time.Second, Peer: "p2"})

if pa.NextExpiry() != t1 {
t.Fatal("expiry should be set to t1, got", pa.NextExpiry())
Expand All @@ -49,7 +49,7 @@ func TestPeerAddrsHeapProperty(t *testing.T) {
const N = 10000
expiringAddrs := peerAddrsInput(N)
for i := 0; i < N; i++ {
heap.Push(pa, expiringAddrs[i])
paa.Insert(expiringAddrs[i])
}

for i := 0; i < N; i++ {
Expand All @@ -70,7 +70,7 @@ func TestPeerAddrsHeapPropertyDeletions(t *testing.T) {
const N = 10000
expiringAddrs := peerAddrsInput(N)
for i := 0; i < N; i++ {
heap.Push(pa, expiringAddrs[i])
paa.Insert(expiringAddrs[i])
}

// delete every 3rd element
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestPeerAddrsHeapPropertyUpdates(t *testing.T) {
var endElements []ma.Multiaddr
for i := 0; i < N; i += 3 {
expiringAddrs[i].Expiry = time.Time{}.Add(1000_000 * time.Second)
pa.Fix(expiringAddrs[i])
pa.Update(expiringAddrs[i])
endElements = append(endElements, expiringAddrs[i].Addr)
}

Expand Down Expand Up @@ -148,7 +148,7 @@ func TestPeerAddrsExpiry(t *testing.T) {
expiringAddrs[i].Expiry = time.Time{}.Add(time.Duration(1+rand.Intn(N)) * time.Second)
}
for i := 0; i < N; i++ {
heap.Push(pa, expiringAddrs[i])
pa.Insert(expiringAddrs[i])
}

expiry := time.Time{}.Add(time.Duration(1+rand.Intn(N)) * time.Second)
Expand Down Expand Up @@ -184,7 +184,7 @@ func BenchmarkPeerAddrs(b *testing.B) {
pa := &paa
expiringAddrs := peerAddrsInput(sz)
for i := 0; i < sz; i++ {
heap.Push(pa, expiringAddrs[i])
pa.Insert(expiringAddrs[i])
}
b.StartTimer()
for {
Expand Down

0 comments on commit a51ec10

Please sign in to comment.