Skip to content

Commit

Permalink
identify: cache the snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Feb 7, 2023
1 parent 313b080 commit 4f1ccc3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
2 changes: 1 addition & 1 deletion p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {

h.updateLocalIpAddr()

if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}, eventbus.Stateful); err != nil {
return nil, err
}
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil {
Expand Down
35 changes: 25 additions & 10 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ type idService struct {
evtPeerIdentificationFailed event.Emitter
}

currentSnapshot struct {
sync.Mutex
snapshot *identifySnapshot
}

addPeerHandlerCh chan addPeerHandlerReq
rmPeerHandlerCh chan rmPeerHandlerReq

Expand Down Expand Up @@ -180,6 +185,8 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
// register protocols that do not depend on peer records.
h.SetStreamHandler(ID, s.sendIdentifyResp)
h.SetStreamHandler(IDPush, s.pushHandler)
// do this after adding the stream handlers, so that these protocols will be included
s.updateSnapshot()

h.Network().Notify((*netNotifiee)(s))
return s, nil
Expand Down Expand Up @@ -261,6 +268,7 @@ func (ids *idService) loop() {
if !more {
return
}
ids.updateSnapshot()
switch e.(type) {
case event.EvtLocalAddressesUpdated:
for pid := range phs {
Expand Down Expand Up @@ -458,20 +466,29 @@ func readAllIDMessages(r pbio.Reader, finalMsg proto.Message) error {
return fmt.Errorf("too many parts")
}

func (ids *idService) getSnapshot() *identifySnapshot {
snapshot := new(identifySnapshot)
func (ids *idService) updateSnapshot() {
log.Debug("updating Identify snapshot")
snapshot := &identifySnapshot{
addrs: ids.Host.Addrs(),
protocols: ids.Host.Mux().Protocols(),
}
if !ids.disableSignedPeerRecord {
if cab, ok := peerstore.GetCertifiedAddrBook(ids.Host.Peerstore()); ok {
snapshot.record = cab.GetPeerRecord(ids.Host.ID())
}
}
snapshot.addrs = ids.Host.Addrs()
snapshot.protocols = ids.Host.Mux().Protocols()
return snapshot

ids.currentSnapshot.Lock()
defer ids.currentSnapshot.Unlock()
ids.currentSnapshot.snapshot = snapshot
}

func (ids *idService) writeChunkedIdentifyMsg(c network.Conn, s network.Stream) error {
snapshot := ids.getSnapshot()
ids.currentSnapshot.Lock()
snapshot := ids.currentSnapshot.snapshot
ids.currentSnapshot.Unlock()
log.Debugw("sending snapshot with protocols", "protos", snapshot.protocols)

mes := ids.createBaseIdentifyResponse(c, snapshot)
sr := ids.getSignedRecord(snapshot)
mes.SignedPeerRecord = sr
Expand All @@ -480,15 +497,13 @@ func (ids *idService) writeChunkedIdentifyMsg(c network.Conn, s network.Stream)
if sr == nil || proto.Size(mes) <= legacyIDSize {
return writer.WriteMsg(mes)
}

mes.SignedPeerRecord = nil
if err := writer.WriteMsg(mes); err != nil {
return err
}

// then write just the signed record
m := &pb.Identify{SignedPeerRecord: sr}
err := writer.WriteMsg(m)
return err
return writer.WriteMsg(&pb.Identify{SignedPeerRecord: sr})
}

func (ids *idService) createBaseIdentifyResponse(
Expand Down

0 comments on commit 4f1ccc3

Please sign in to comment.