From 889617043c62bc7994df83161a69775ecdd88e7b Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 25 Oct 2021 15:10:40 +0200 Subject: [PATCH] emit the EvtPeerConnectednessChanged event --- p2p/host/basic/basic_host.go | 5 ++ p2p/host/basic/peer_connectedness.go | 73 +++++++++++++++++++++++ p2p/host/basic/peer_connectedness_test.go | 49 +++++++++++++++ 3 files changed, 127 insertions(+) create mode 100644 p2p/host/basic/peer_connectedness.go create mode 100644 p2p/host/basic/peer_connectedness_test.go diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 0950a803e9..958dd8763b 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -183,6 +183,11 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil { return nil, err } + evtPeerConnectednessChanged, err := h.eventbus.Emitter(&event.EvtPeerConnectednessChanged{}) + if err != nil { + return nil, err + } + h.Network().Notify(newPeerConnectWatcher(evtPeerConnectednessChanged)) if !h.disableSignedPeerRecord { cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore()) diff --git a/p2p/host/basic/peer_connectedness.go b/p2p/host/basic/peer_connectedness.go new file mode 100644 index 0000000000..a0ba17c3da --- /dev/null +++ b/p2p/host/basic/peer_connectedness.go @@ -0,0 +1,73 @@ +package basichost + +import ( + "sync" + + "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + ma "github.com/multiformats/go-multiaddr" +) + +type peerConnectWatcher struct { + emitter event.Emitter + + mutex sync.Mutex + connected map[peer.ID]struct{} +} + +var _ network.Notifiee = &peerConnectWatcher{} + +func newPeerConnectWatcher(emitter event.Emitter) *peerConnectWatcher { + return &peerConnectWatcher{ + emitter: emitter, + connected: make(map[peer.ID]struct{}), + } +} + +func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {} +func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {} +func (w *peerConnectWatcher) OpenedStream(network.Network, network.Stream) {} +func (w *peerConnectWatcher) ClosedStream(network.Network, network.Stream) {} + +func (w *peerConnectWatcher) Connected(n network.Network, conn network.Conn) { + p := conn.RemotePeer() + w.handleTransition(p, n.Connectedness(p)) +} + +func (w *peerConnectWatcher) Disconnected(n network.Network, conn network.Conn) { + p := conn.RemotePeer() + w.handleTransition(p, n.Connectedness(p)) +} + +func (w *peerConnectWatcher) handleTransition(p peer.ID, state network.Connectedness) { + if changed := w.checkTransition(p, state); !changed { + return + } + w.emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: p, + Connectedness: state, + }) +} + +func (w *peerConnectWatcher) checkTransition(p peer.ID, state network.Connectedness) bool { + w.mutex.Lock() + defer w.mutex.Unlock() + switch state { + case network.Connected: + if _, ok := w.connected[p]; ok { + return false + } + w.connected[p] = struct{}{} + return true + case network.NotConnected: + if _, ok := w.connected[p]; ok { + delete(w.connected, p) + return true + } + return false + default: + return false + } +} diff --git a/p2p/host/basic/peer_connectedness_test.go b/p2p/host/basic/peer_connectedness_test.go new file mode 100644 index 0000000000..9b44302282 --- /dev/null +++ b/p2p/host/basic/peer_connectedness_test.go @@ -0,0 +1,49 @@ +package basichost + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/network" + + "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/peer" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" + + "github.com/stretchr/testify/require" +) + +func TestPeerConnectedness(t *testing.T) { + h1, err := NewHost(swarmt.GenSwarm(t), nil) + require.NoError(t, err) + defer h1.Close() + h2, err := NewHost(swarmt.GenSwarm(t), nil) + require.NoError(t, err) + + sub1, err := h1.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) + require.NoError(t, err) + defer sub1.Close() + sub2, err := h2.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) + require.NoError(t, err) + defer sub2.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})) + require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{ + Peer: h2.ID(), + Connectedness: network.Connected, + }) + require.Equal(t, (<-sub2.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{ + Peer: h1.ID(), + Connectedness: network.Connected, + }) + + // now close h2. This will disconnect it from h1. + require.NoError(t, h2.Close()) + require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{ + Peer: h2.ID(), + Connectedness: network.NotConnected, + }) +}