diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 0950a803e9..958dd8763b 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -183,6 +183,11 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil { return nil, err } + evtPeerConnectednessChanged, err := h.eventbus.Emitter(&event.EvtPeerConnectednessChanged{}) + if err != nil { + return nil, err + } + h.Network().Notify(newPeerConnectWatcher(evtPeerConnectednessChanged)) if !h.disableSignedPeerRecord { cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore()) diff --git a/p2p/host/basic/peer_connectedness.go b/p2p/host/basic/peer_connectedness.go new file mode 100644 index 0000000000..a0ba17c3da --- /dev/null +++ b/p2p/host/basic/peer_connectedness.go @@ -0,0 +1,73 @@ +package basichost + +import ( + "sync" + + "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + ma "github.com/multiformats/go-multiaddr" +) + +type peerConnectWatcher struct { + emitter event.Emitter + + mutex sync.Mutex + connected map[peer.ID]struct{} +} + +var _ network.Notifiee = &peerConnectWatcher{} + +func newPeerConnectWatcher(emitter event.Emitter) *peerConnectWatcher { + return &peerConnectWatcher{ + emitter: emitter, + connected: make(map[peer.ID]struct{}), + } +} + +func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {} +func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {} +func (w *peerConnectWatcher) OpenedStream(network.Network, network.Stream) {} +func (w *peerConnectWatcher) ClosedStream(network.Network, network.Stream) {} + +func (w *peerConnectWatcher) Connected(n network.Network, conn network.Conn) { + p := conn.RemotePeer() + w.handleTransition(p, n.Connectedness(p)) +} + +func (w *peerConnectWatcher) Disconnected(n network.Network, conn network.Conn) { + p := conn.RemotePeer() + w.handleTransition(p, n.Connectedness(p)) +} + +func (w *peerConnectWatcher) handleTransition(p peer.ID, state network.Connectedness) { + if changed := w.checkTransition(p, state); !changed { + return + } + w.emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: p, + Connectedness: state, + }) +} + +func (w *peerConnectWatcher) checkTransition(p peer.ID, state network.Connectedness) bool { + w.mutex.Lock() + defer w.mutex.Unlock() + switch state { + case network.Connected: + if _, ok := w.connected[p]; ok { + return false + } + w.connected[p] = struct{}{} + return true + case network.NotConnected: + if _, ok := w.connected[p]; ok { + delete(w.connected, p) + return true + } + return false + default: + return false + } +}