Skip to content

Commit

Permalink
gc signed peer records too
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Sep 15, 2024
1 parent c7626c0 commit 1765d2e
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 78 deletions.
55 changes: 17 additions & 38 deletions core/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,9 @@ type AddrBook interface {
}

// CertifiedAddrBook manages "self-certified" addresses for remote peers.
// Self-certified addresses are contained in peer.PeerRecords
// which are wrapped in a record.Envelope and signed by the peer
// to whom they belong.
//
// Certified addresses (CA) are generally more secure than uncertified
// addresses (UA). Consequently, CAs beat and displace UAs. When the
// peerstore learns CAs for a peer, it will reject UAs for the same peer
// (as long as the former haven't expired).
// Furthermore, peer records act like sequenced snapshots of CAs. Therefore,
// processing a peer record that's newer than the last one seen overwrites
// all addresses with the incoming ones.
// Self-certified addresses are contained in signed peer.PeerRecords.
// Certified addresses are generally more secure than uncertified
// addresses.
//
// This interface is most useful when combined with AddrBook.
// To test whether a given AddrBook / Peerstore implementation supports
Expand All @@ -143,36 +135,23 @@ type AddrBook interface {
// cab.ConsumePeerRecord(signedPeerRecord, aTTL)
// }
type CertifiedAddrBook interface {
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in
// a record.Envelope), which will expire after the given TTL.
//
// The 'accepted' return value indicates that the record was successfully processed
// and integrated into the CertifiedAddrBook state. If 'accepted' is false but no
// error is returned, it means that the record was ignored, most likely because
// a newer record exists for the same peer.
//
// Signed records added via this method will be stored without
// alteration as long as the address TTLs remain valid. The Envelopes
// containing the PeerRecords can be retrieved by calling GetPeerRecord(peerID).
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire when
// all addresses associated with the peer have expired. The addresses in provided signed
// peer.PeerRecord are expired after `ttl` duration.
//
// If the signed PeerRecord belongs to a peer that already has certified
// addresses in the CertifiedAddrBook, the new addresses will replace the
// older ones, if the new record has a higher sequence number than the
// existing record. Attempting to add a peer record with a
// sequence number that's <= an existing record for the same peer will not
// result in an error, but the record will be ignored, and the 'accepted'
// bool return value will be false.
// The `accepted` return value indicates that the record was successfully processed. If
// `accepted` is false but no error is returned, it means that the record was ignored, most
// likely because a newer record exists for the same peer.
//
// If the CertifiedAddrBook is also an AddrBook (which is most likely the case),
// adding certified addresses for a peer will *replace* any
// existing non-certified addresses for that peer, and only the certified
// addresses will be returned from AddrBook.Addrs thereafter.
// If the signed peer.PeerRecord belongs to a peer that already has certified addresses in
// the CertifiedAddrBook, and if the new record has a higher sequence number than the
// existing record, the new addresses will be added and the older ones will be kept
// unchanged. Attempting to add a peer record with a sequence number that's lower than an
// existing record will not result in an error, but the record will be ignored, and the
// `accepted` return value will be false.
//
// Likewise, once certified addresses have been added for a given peer,
// any non-certified addresses added via AddrBook.AddAddrs or
// AddrBook.SetAddrs will be ignored. AddrBook.SetAddrs may still be used
// to update the TTL of certified addresses that have previously been
// added via ConsumePeerRecord.
// The Envelopes containing the PeerRecords can be retrieved by calling
// GetPeerRecord(peerID).
ConsumePeerRecord(s *record.Envelope, ttl time.Duration) (accepted bool, err error)

// GetPeerRecord returns an Envelope containing a PeerRecord for the
Expand Down
89 changes: 49 additions & 40 deletions p2p/host/peerstore/pstoremem/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

var SignedPeerRecordBound = 1_000
var SignedPeerRecordBound = 100_000

var log = logging.Logger("peerstore")

Expand All @@ -42,13 +42,13 @@ type peerRecordState struct {
var _ heap.Interface = &peerAddrs{}

type peerAddrs struct {
addrs map[peer.ID]map[string]*expiringAddr // peer.ID -> addr.Bytes() -> *expiringAddr
Addrs map[peer.ID]map[string]*expiringAddr // peer.ID -> addr.Bytes() -> *expiringAddr
expiringHeap []*expiringAddr
}

func newPeerAddrs() peerAddrs {
return peerAddrs{
addrs: make(map[peer.ID]map[string]*expiringAddr),
Addrs: make(map[peer.ID]map[string]*expiringAddr),
}
}

Expand All @@ -63,10 +63,10 @@ func (pa *peerAddrs) Swap(i, j int) {
}
func (pa *peerAddrs) Push(x any) {
a := x.(*expiringAddr)
if _, ok := pa.addrs[a.Peer]; !ok {
pa.addrs[a.Peer] = make(map[string]*expiringAddr)
if _, ok := pa.Addrs[a.Peer]; !ok {
pa.Addrs[a.Peer] = make(map[string]*expiringAddr)
}
pa.addrs[a.Peer][string(a.Addr.Bytes())] = a
pa.Addrs[a.Peer][string(a.Addr.Bytes())] = a
a.heapIndex = len(pa.expiringHeap)
pa.expiringHeap = append(pa.expiringHeap, a)
}
Expand All @@ -77,10 +77,10 @@ func (pa *peerAddrs) Pop() any {
a.heapIndex = -1
pa.expiringHeap = old[0 : n-1]

if m, ok := pa.addrs[a.Peer]; ok {
if m, ok := pa.Addrs[a.Peer]; ok {
delete(m, string(a.Addr.Bytes()))
if len(m) == 0 {
delete(pa.addrs, a.Peer)
delete(pa.Addrs, a.Peer)
}
}

Expand All @@ -94,16 +94,16 @@ func (pa *peerAddrs) Fix(a *expiringAddr) {
func (pa *peerAddrs) Delete(a *expiringAddr) {
heap.Remove(pa, a.heapIndex)
a.heapIndex = -1
if m, ok := pa.addrs[a.Peer]; ok {
if m, ok := pa.Addrs[a.Peer]; ok {
delete(m, string(a.Addr.Bytes()))
if len(m) == 0 {
delete(pa.addrs, a.Peer)
delete(pa.Addrs, a.Peer)
}
}
}

func (pa *peerAddrs) FindAddr(p peer.ID, addrBytes ma.Multiaddr) (*expiringAddr, bool) {
if m, ok := pa.addrs[p]; ok {
if m, ok := pa.Addrs[p]; ok {
v, ok := m[string(addrBytes.Bytes())]
return v, ok
}
Expand All @@ -117,10 +117,12 @@ func (pa *peerAddrs) NextExpiry() time.Time {
return pa.expiringHeap[len(pa.expiringHeap)-1].Expires
}

func (pa *peerAddrs) gc(now time.Time) {
for len(pa.expiringHeap) > 0 && now.After(pa.NextExpiry()) {
heap.Pop(pa)
func (pa *peerAddrs) PopIfExpired(now time.Time) (*expiringAddr, bool) {
if len(pa.expiringHeap) > 0 && now.After(pa.NextExpiry()) {
a := heap.Pop(pa)
return a.(*expiringAddr), true
}
return nil, false
}

type clock interface {
Expand Down Expand Up @@ -201,14 +203,20 @@ func (mab *memoryAddrBook) gc() {
now := mab.clock.Now()
mab.mu.Lock()
defer mab.mu.Unlock()
mab.addrs.gc(now)
for {
ea, ok := mab.addrs.PopIfExpired(now)
if !ok {
return
}
mab.maybeDeleteSignedPeerRecordUnlocked(ea.Peer)
}
}

func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice {
mab.mu.RLock()
defer mab.mu.RUnlock()
peers := make(peer.IDSlice, 0, len(mab.addrs.addrs))
for pid := range mab.addrs.addrs {
peers := make(peer.IDSlice, 0, len(mab.addrs.Addrs))
for pid := range mab.addrs.Addrs {
peers = append(peers, pid)
}
return peers
Expand All @@ -219,22 +227,15 @@ func (mab *memoryAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Durati
mab.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
}

// AddAddrs gives memoryAddrBook addresses to use, with a given ttl
// (time-to-live), after which the address is no longer valid.
// AddAddrs adds `addrs` for peer `p`, which will expire after the given `ttl`.
// This function never reduces the TTL or expiration of an address.
func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
// if we have a valid peer record, ignore unsigned addrs
// peerRec := mab.GetPeerRecord(p)
// if peerRec != nil {
// return
// }
mab.addAddrs(p, addrs, ttl)
}

var ErrTooManyRecords = fmt.Errorf("too many signed peer records. Dropping this one")

// ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in
// a record.Envelope), which will expire after the given TTL.
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire after the given TTL.
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details.
func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) {
r, err := recordEnvelope.Record()
Expand All @@ -249,13 +250,13 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt
return false, fmt.Errorf("signing key does not match PeerID in PeerRecord")
}

// ensure seq is greater than, or equal to, the last received
mab.mu.Lock()
defer mab.mu.Unlock()
if (len(mab.signedPeerRecords)) >= SignedPeerRecordBound {
return false, ErrTooManyRecords
}

// ensure seq is greater than or equal to the last received
lastState, found := mab.signedPeerRecords[rec.PeerID]
if found && lastState.Seq > rec.Seq {
return false, nil
Expand All @@ -268,6 +269,12 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt
return true, nil
}

func (mab *memoryAddrBook) maybeDeleteSignedPeerRecordUnlocked(p peer.ID) {
if len(mab.addrs.Addrs[p]) == 0 {
delete(mab.signedPeerRecords, p)
}
}

func (mab *memoryAddrBook) addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
mab.mu.Lock()
defer mab.mu.Unlock()
Expand All @@ -276,6 +283,8 @@ func (mab *memoryAddrBook) addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
}

func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
defer mab.maybeDeleteSignedPeerRecordUnlocked(p)

// if ttl is zero, exit. nothing to do.
if ttl <= 0 {
return
Expand Down Expand Up @@ -330,6 +339,8 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
mab.mu.Lock()
defer mab.mu.Unlock()

defer mab.maybeDeleteSignedPeerRecordUnlocked(p)

exp := mab.clock.Now().Add(ttl)
for _, addr := range addrs {
addr, addrPid := peer.SplitAddr(addr)
Expand All @@ -343,7 +354,6 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
}

if a, found := mab.addrs.FindAddr(p, addr); found {
// re-set all of them for new ttl.
if ttl > 0 {
a.Addr = addr
a.Expires = exp
Expand All @@ -367,9 +377,11 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
mab.mu.Lock()
defer mab.mu.Unlock()
exp := mab.clock.Now().Add(newTTL)

for _, a := range mab.addrs.addrs[p] {
defer mab.maybeDeleteSignedPeerRecordUnlocked(p)

exp := mab.clock.Now().Add(newTTL)
for _, a := range mab.addrs.Addrs[p] {
if oldTTL == a.TTL {
if newTTL == 0 {
mab.addrs.Delete(a)
Expand All @@ -386,11 +398,10 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t
func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
mab.mu.RLock()
defer mab.mu.RUnlock()

if _, ok := mab.addrs.addrs[p]; !ok {
if _, ok := mab.addrs.Addrs[p]; !ok {
return nil
}
return validAddrs(mab.clock.Now(), mab.addrs.addrs[p])
return validAddrs(mab.clock.Now(), mab.addrs.Addrs[p])
}

func validAddrs(now time.Time, amap map[string]*expiringAddr) []ma.Multiaddr {
Expand All @@ -414,13 +425,11 @@ func (mab *memoryAddrBook) GetPeerRecord(p peer.ID) *record.Envelope {
mab.mu.RLock()
defer mab.mu.RUnlock()

if _, ok := mab.addrs.addrs[p]; !ok {
if _, ok := mab.addrs.Addrs[p]; !ok {
return nil
}
// although the signed record gets garbage collected when all addrs inside it are expired,
// we may be in between the expiration time and the GC interval
// so, we check to see if we have any valid signed addrs before returning the record
if len(validAddrs(mab.clock.Now(), mab.addrs.addrs[p])) == 0 {
// The record may have expired, but not gargage collected.
if len(validAddrs(mab.clock.Now(), mab.addrs.Addrs[p])) == 0 {
return nil
}

Expand All @@ -437,7 +446,7 @@ func (mab *memoryAddrBook) ClearAddrs(p peer.ID) {
defer mab.mu.Unlock()

delete(mab.signedPeerRecords, p)
for _, a := range mab.addrs.addrs[p] {
for _, a := range mab.addrs.Addrs[p] {
mab.addrs.Delete(a)
}
}
Expand All @@ -448,7 +457,7 @@ func (mab *memoryAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.
var initial []ma.Multiaddr

mab.mu.RLock()
if m, ok := mab.addrs.addrs[p]; ok {
if m, ok := mab.addrs.Addrs[p]; ok {
initial = make([]ma.Multiaddr, 0, len(m))
for _, a := range m {
initial = append(initial, a.Addr)
Expand Down

0 comments on commit 1765d2e

Please sign in to comment.