From 2318fd2731ffc24db556f50d8466dda19c81f80a Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 28 May 2022 10:25:37 +0200 Subject: [PATCH 1/5] pass an event bus to the swarm constructor --- config/config.go | 11 ++++++++--- p2p/host/basic/basic_host.go | 17 +++++++---------- p2p/net/swarm/swarm.go | 10 ++++++++++ 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/config/config.go b/config/config.go index d3824c2a9f..bb1573f104 100644 --- a/config/config.go +++ b/config/config.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/network" @@ -23,6 +24,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/host/autorelay" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" blankhost "github.com/libp2p/go-libp2p/p2p/host/blank" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" routed "github.com/libp2p/go-libp2p/p2p/host/routed" "github.com/libp2p/go-libp2p/p2p/net/swarm" @@ -123,7 +125,7 @@ type Config struct { PrometheusRegisterer prometheus.Registerer } -func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) { +func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) { if cfg.Peerstore == nil { return nil, fmt.Errorf("no peerstore specified") } @@ -175,6 +177,7 @@ func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) { opts = append(opts, swarm.WithMetricsTracer(swarm.NewMetricsTracer(swarm.WithRegisterer(cfg.PrometheusRegisterer)))) } + opts = append(opts, swarm.WithEventBus(eventBus)) // TODO: Make the swarm implementation configurable. return swarm.NewSwarm(pid, cfg.Peerstore, opts...) } @@ -284,12 +287,14 @@ func (cfg *Config) addTransports(h host.Host) error { // // This function consumes the config. Do not reuse it (really!). func (cfg *Config) NewNode() (host.Host, error) { - swrm, err := cfg.makeSwarm(!cfg.DisableMetrics) + eventBus := eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer)))) + swrm, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics) if err != nil { return nil, err } h, err := bhost.NewHost(swrm, &bhost.HostOpts{ + EventBus: eventBus, ConnManager: cfg.ConnManager, AddrsFactory: cfg.AddrsFactory, NATManager: cfg.NATManager, @@ -397,7 +402,7 @@ func (cfg *Config) NewNode() (host.Host, error) { Peerstore: ps, } - dialer, err := autoNatCfg.makeSwarm(false) + dialer, err := autoNatCfg.makeSwarm(eventbus.NewBus(), false) if err != nil { h.Close() return nil, err diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 707178522d..a26abc9b94 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -32,7 +32,6 @@ import ( "github.com/libp2p/go-netroute" logging "github.com/ipfs/go-log/v2" - ma "github.com/multiformats/go-multiaddr" madns "github.com/multiformats/go-multiaddr-dns" manet "github.com/multiformats/go-multiaddr/net" @@ -108,6 +107,9 @@ var _ host.Host = (*BasicHost)(nil) // HostOpts holds options that can be passed to NewHost in order to // customize construction of the *BasicHost. type HostOpts struct { + // EventBus sets the event bus. Will construct a new event bus if omitted. + EventBus event.Bus + // MultistreamMuxer is essential for the *BasicHost and will use a sensible default value if omitted. MultistreamMuxer *msmux.MultistreamMuxer[protocol.ID] @@ -164,16 +166,11 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { if opts == nil { opts = &HostOpts{} } - - var eventBus event.Bus - if opts.EnableMetrics { - eventBus = eventbus.NewBus( - eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(opts.PrometheusRegisterer)))) - } else { - eventBus = eventbus.NewBus() + if opts.EventBus == nil { + opts.EventBus = eventbus.NewBus() } - psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), eventBus) + psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), opts.EventBus) if err != nil { return nil, err } @@ -186,7 +183,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { negtimeout: DefaultNegotiationTimeout, AddrsFactory: DefaultAddrsFactory, maResolver: madns.DefaultResolver, - eventbus: eventBus, + eventbus: opts.EventBus, addrChangeChan: make(chan struct{}, 1), ctx: hostCtx, ctxCancel: cancel, diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 8f317f46fd..5bf1e0af47 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -11,6 +11,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/connmgr" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -99,6 +100,13 @@ func WithResourceManager(m network.ResourceManager) Option { } } +func WithEventBus(b event.Bus) Option { + return func(s *Swarm) error { + s.eventBus = b + return nil + } +} + // Swarm is a connection muxer, allowing connections to other peers to // be opened and closed, while still using the same Chan for all // communication. The Chan sends/receives Messages, which note the @@ -111,6 +119,8 @@ type Swarm struct { // down before continuing. refs sync.WaitGroup + eventBus event.Bus + rcmgr network.ResourceManager local peer.ID From 631d371ae0147ff641e990ef8bc2057b9592c5aa Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 28 May 2022 10:29:44 +0200 Subject: [PATCH 2/5] make the eventbus parameter a required swarm constructor parameter --- config/config.go | 3 +-- p2p/net/swarm/dial_worker_test.go | 3 ++- p2p/net/swarm/swarm.go | 10 ++-------- p2p/net/swarm/swarm_addr_test.go | 3 ++- p2p/net/swarm/swarm_dial_test.go | 5 +++-- p2p/net/swarm/testing/testing.go | 7 ++++--- p2p/protocol/circuitv2/relay/relay_test.go | 3 ++- 7 files changed, 16 insertions(+), 18 deletions(-) diff --git a/config/config.go b/config/config.go index bb1573f104..70686f7634 100644 --- a/config/config.go +++ b/config/config.go @@ -177,9 +177,8 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa opts = append(opts, swarm.WithMetricsTracer(swarm.NewMetricsTracer(swarm.WithRegisterer(cfg.PrometheusRegisterer)))) } - opts = append(opts, swarm.WithEventBus(eventBus)) // TODO: Make the swarm implementation configurable. - return swarm.NewSwarm(pid, cfg.Peerstore, opts...) + return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...) } func (cfg *Config) addTransports(h host.Host) error { diff --git a/p2p/net/swarm/dial_worker_test.go b/p2p/net/swarm/dial_worker_test.go index 88af08d226..2c441106b1 100644 --- a/p2p/net/swarm/dial_worker_test.go +++ b/p2p/net/swarm/dial_worker_test.go @@ -15,6 +15,7 @@ import ( "github.com/libp2p/go-libp2p/core/sec" "github.com/libp2p/go-libp2p/core/sec/insecure" "github.com/libp2p/go-libp2p/core/transport" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" @@ -43,7 +44,7 @@ func makeSwarm(t *testing.T) *Swarm { ps.AddPrivKey(id, priv) t.Cleanup(func() { ps.Close() }) - s, err := NewSwarm(id, ps, WithDialTimeout(time.Second)) + s, err := NewSwarm(id, ps, eventbus.NewBus(), WithDialTimeout(time.Second)) require.NoError(t, err) upgrader := makeUpgrader(t, s) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 5bf1e0af47..65c14f0669 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -100,13 +100,6 @@ func WithResourceManager(m network.ResourceManager) Option { } } -func WithEventBus(b event.Bus) Option { - return func(s *Swarm) error { - s.eventBus = b - return nil - } -} - // Swarm is a connection muxer, allowing connections to other peers to // be opened and closed, while still using the same Chan for all // communication. The Chan sends/receives Messages, which note the @@ -173,11 +166,12 @@ type Swarm struct { } // NewSwarm constructs a Swarm. -func NewSwarm(local peer.ID, peers peerstore.Peerstore, opts ...Option) (*Swarm, error) { +func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts ...Option) (*Swarm, error) { ctx, cancel := context.WithCancel(context.Background()) s := &Swarm{ local: local, peers: peers, + eventBus: eventBus, ctx: ctx, ctxCancel: cancel, dialTimeout: defaultDialTimeout, diff --git a/p2p/net/swarm/swarm_addr_test.go b/p2p/net/swarm/swarm_addr_test.go index eb68b818a1..6cb4f96ae1 100644 --- a/p2p/net/swarm/swarm_addr_test.go +++ b/p2p/net/swarm/swarm_addr_test.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/test" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/net/swarm" swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" @@ -74,7 +75,7 @@ func TestDialAddressSelection(t *testing.T) { require.NoError(t, err) id, err := peer.IDFromPrivateKey(priv) require.NoError(t, err) - s, err := swarm.NewSwarm("local", nil) + s, err := swarm.NewSwarm("local", nil, eventbus.NewBus()) require.NoError(t, err) tcpTr, err := tcp.NewTCPTransport(nil, nil) diff --git a/p2p/net/swarm/swarm_dial_test.go b/p2p/net/swarm/swarm_dial_test.go index 49a4abe1cd..566a2307f4 100644 --- a/p2p/net/swarm/swarm_dial_test.go +++ b/p2p/net/swarm/swarm_dial_test.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/test" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/libp2p/go-libp2p/p2p/transport/tcp" "github.com/libp2p/go-libp2p/p2p/transport/websocket" @@ -47,7 +48,7 @@ func TestAddrsForDial(t *testing.T) { tpt, err := websocket.New(nil, &network.NullResourceManager{}) require.NoError(t, err) - s, err := NewSwarm(id, ps, WithMultiaddrResolver(resolver)) + s, err := NewSwarm(id, ps, eventbus.NewBus(), WithMultiaddrResolver(resolver)) require.NoError(t, err) defer s.Close() err = s.AddTransport(tpt) @@ -74,7 +75,7 @@ func newTestSwarmWithResolver(t *testing.T, resolver *madns.Resolver) *Swarm { ps.AddPubKey(id, priv.GetPublic()) ps.AddPrivKey(id, priv) t.Cleanup(func() { ps.Close() }) - s, err := NewSwarm(id, ps, WithMultiaddrResolver(resolver)) + s, err := NewSwarm(id, ps, eventbus.NewBus(), WithMultiaddrResolver(resolver)) require.NoError(t, err) t.Cleanup(func() { s.Close() diff --git a/p2p/net/swarm/testing/testing.go b/p2p/net/swarm/testing/testing.go index 0f6824c4db..69b107cdb6 100644 --- a/p2p/net/swarm/testing/testing.go +++ b/p2p/net/swarm/testing/testing.go @@ -5,8 +5,6 @@ import ( "testing" "time" - "github.com/libp2p/go-libp2p/p2p/transport/quicreuse" - "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/control" "github.com/libp2p/go-libp2p/core/crypto" @@ -17,11 +15,13 @@ import ( "github.com/libp2p/go-libp2p/core/sec" "github.com/libp2p/go-libp2p/core/sec/insecure" "github.com/libp2p/go-libp2p/core/transport" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" "github.com/libp2p/go-libp2p/p2p/net/swarm" tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" quic "github.com/libp2p/go-libp2p/p2p/transport/quic" + "github.com/libp2p/go-libp2p/p2p/transport/quicreuse" "github.com/libp2p/go-libp2p/p2p/transport/tcp" ma "github.com/multiformats/go-multiaddr" @@ -140,7 +140,8 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm { if cfg.connectionGater != nil { swarmOpts = append(swarmOpts, swarm.WithConnectionGater(cfg.connectionGater)) } - s, err := swarm.NewSwarm(id, ps, swarmOpts...) + + s, err := swarm.NewSwarm(id, ps, eventbus.NewBus(), swarmOpts...) require.NoError(t, err) upgrader := GenUpgrader(t, s, cfg.connectionGater) diff --git a/p2p/protocol/circuitv2/relay/relay_test.go b/p2p/protocol/circuitv2/relay/relay_test.go index a8c56e73c9..a229fe4aac 100644 --- a/p2p/protocol/circuitv2/relay/relay_test.go +++ b/p2p/protocol/circuitv2/relay/relay_test.go @@ -16,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/transport" bhost "github.com/libp2p/go-libp2p/p2p/host/blank" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/libp2p/go-libp2p/p2p/net/swarm" swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" @@ -48,7 +49,7 @@ func getNetHosts(t *testing.T, ctx context.Context, n int) (hosts []host.Host, u } bwr := metrics.NewBandwidthCounter() - netw, err := swarm.NewSwarm(p, ps, swarm.WithMetrics(bwr)) + netw, err := swarm.NewSwarm(p, ps, eventbus.NewBus(), swarm.WithMetrics(bwr)) if err != nil { t.Fatal(err) } From b006d83cc046e18e9c9f147985105dd799be6aef Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 28 May 2022 10:50:15 +0200 Subject: [PATCH 3/5] emit Connectedness notifications from the swarm --- p2p/net/swarm/swarm.go | 31 +++++++++++++-- p2p/net/swarm/swarm_event_test.go | 66 +++++++++++++++++++++++++++++++ p2p/net/swarm/testing/testing.go | 14 ++++++- 3 files changed, 106 insertions(+), 5 deletions(-) create mode 100644 p2p/net/swarm/swarm_event_test.go diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 65c14f0669..beec6592b0 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -112,7 +112,7 @@ type Swarm struct { // down before continuing. refs sync.WaitGroup - eventBus event.Bus + emitter event.Emitter rcmgr network.ResourceManager @@ -167,11 +167,15 @@ type Swarm struct { // NewSwarm constructs a Swarm. func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts ...Option) (*Swarm, error) { + emitter, err := eventBus.Emitter(new(event.EvtPeerConnectednessChanged)) + if err != nil { + return nil, err + } ctx, cancel := context.WithCancel(context.Background()) s := &Swarm{ local: local, peers: peers, - eventBus: eventBus, + emitter: emitter, ctx: ctx, ctxCancel: cancel, dialTimeout: defaultDialTimeout, @@ -207,6 +211,8 @@ func (s *Swarm) Close() error { func (s *Swarm) close() { s.ctxCancel() + s.emitter.Close() + // Prevents new connections and/or listeners from being added to the swarm. s.listeners.Lock() listeners := s.listeners.m @@ -323,6 +329,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, } c.streams.m = make(map[*Stream]struct{}) + isFirstConn := len(s.conns.m[p]) == 0 s.conns.m[p] = append(s.conns.m[p], c) // Add two swarm refs: @@ -340,6 +347,13 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, }) c.notifyLk.Unlock() + if isFirstConn { + s.emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: p, + Connectedness: network.Connected, + }) + } + c.start() return c, nil } @@ -613,13 +627,14 @@ func (s *Swarm) StopNotify(f network.Notifiee) { func (s *Swarm) removeConn(c *Conn) { p := c.RemotePeer() + var disconnected bool s.conns.Lock() - defer s.conns.Unlock() cs := s.conns.m[p] for i, ci := range cs { if ci == c { if len(cs) == 1 { delete(s.conns.m, p) + disconnected = true } else { // NOTE: We're intentionally preserving order. // This way, connections to a peer are always @@ -628,9 +643,17 @@ func (s *Swarm) removeConn(c *Conn) { cs[len(cs)-1] = nil s.conns.m[p] = cs[:len(cs)-1] } - return + break } } + s.conns.Unlock() + + if disconnected { + s.emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: p, + Connectedness: network.NotConnected, + }) + } } // String returns a string representation of Network. diff --git a/p2p/net/swarm/swarm_event_test.go b/p2p/net/swarm/swarm_event_test.go new file mode 100644 index 0000000000..7d4fb6bd5d --- /dev/null +++ b/p2p/net/swarm/swarm_event_test.go @@ -0,0 +1,66 @@ +package swarm_test + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" + . "github.com/libp2p/go-libp2p/p2p/net/swarm" + swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" + + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +func newSwarmWithSubscription(t *testing.T) (*Swarm, event.Subscription) { + t.Helper() + bus := eventbus.NewBus() + sw := swarmt.GenSwarm(t, swarmt.EventBus(bus)) + t.Cleanup(func() { sw.Close() }) + sub, err := bus.Subscribe(new(event.EvtPeerConnectednessChanged)) + require.NoError(t, err) + t.Cleanup(func() { sub.Close() }) + return sw, sub +} + +func checkEvent(t *testing.T, sub event.Subscription, expected event.EvtPeerConnectednessChanged) { + t.Helper() + select { + case ev, ok := <-sub.Out(): + require.True(t, ok) + evt := ev.(event.EvtPeerConnectednessChanged) + require.Equal(t, expected.Connectedness, evt.Connectedness, "wrong connectedness state") + require.Equal(t, expected.Peer, evt.Peer) + case <-time.After(time.Second): + t.Fatal("didn't get PeerConnectedness event") + } + + // check that there are no more events + select { + case <-sub.Out(): + t.Fatal("didn't expect any more events") + case <-time.After(100 * time.Millisecond): + return + } +} + +func TestConnectednessEventsSingleConn(t *testing.T) { + s1, sub1 := newSwarmWithSubscription(t) + s2, sub2 := newSwarmWithSubscription(t) + + s1.Peerstore().AddAddrs(s2.LocalPeer(), []ma.Multiaddr{s2.ListenAddresses()[0]}, time.Hour) + _, err := s1.DialPeer(context.Background(), s2.LocalPeer()) + require.NoError(t, err) + + checkEvent(t, sub1, event.EvtPeerConnectednessChanged{Peer: s2.LocalPeer(), Connectedness: network.Connected}) + checkEvent(t, sub2, event.EvtPeerConnectednessChanged{Peer: s1.LocalPeer(), Connectedness: network.Connected}) + + for _, c := range s2.ConnsToPeer(s1.LocalPeer()) { + require.NoError(t, c.Close()) + } + checkEvent(t, sub1, event.EvtPeerConnectednessChanged{Peer: s2.LocalPeer(), Connectedness: network.NotConnected}) + checkEvent(t, sub2, event.EvtPeerConnectednessChanged{Peer: s1.LocalPeer(), Connectedness: network.NotConnected}) +} diff --git a/p2p/net/swarm/testing/testing.go b/p2p/net/swarm/testing/testing.go index 69b107cdb6..b052a993ad 100644 --- a/p2p/net/swarm/testing/testing.go +++ b/p2p/net/swarm/testing/testing.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/control" "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -36,6 +37,7 @@ type config struct { connectionGater connmgr.ConnectionGater sk crypto.PrivKey swarmOpts []swarm.Option + eventBus event.Bus clock } @@ -99,6 +101,12 @@ func OptPeerPrivateKey(sk crypto.PrivKey) Option { } } +func EventBus(b event.Bus) Option { + return func(_ *testing.T, c *config) { + c.eventBus = b + } +} + // GenUpgrader creates a new connection upgrader for use with this swarm. func GenUpgrader(t *testing.T, n *swarm.Swarm, connGater connmgr.ConnectionGater, opts ...tptu.Option) transport.Upgrader { id := n.LocalPeer() @@ -141,7 +149,11 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm { swarmOpts = append(swarmOpts, swarm.WithConnectionGater(cfg.connectionGater)) } - s, err := swarm.NewSwarm(id, ps, eventbus.NewBus(), swarmOpts...) + eventBus := cfg.eventBus + if eventBus == nil { + eventBus = eventbus.NewBus() + } + s, err := swarm.NewSwarm(id, ps, eventBus, swarmOpts...) require.NoError(t, err) upgrader := GenUpgrader(t, s, cfg.connectionGater) From 9f32bc22694b4837de03ef056b575832e9e96f6f Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 28 May 2022 11:41:48 +0200 Subject: [PATCH 4/5] remove peer connectedness watchers from hosts --- p2p/host/basic/basic_host.go | 5 -- p2p/host/basic/peer_connectedness.go | 71 ----------------------- p2p/host/basic/peer_connectedness_test.go | 48 --------------- p2p/host/blank/blank.go | 5 -- p2p/host/blank/peer_connectedness.go | 71 ----------------------- p2p/host/blank/peer_connectedness_test.go | 46 --------------- 6 files changed, 246 deletions(-) delete mode 100644 p2p/host/basic/peer_connectedness.go delete mode 100644 p2p/host/basic/peer_connectedness_test.go delete mode 100644 p2p/host/blank/peer_connectedness.go delete mode 100644 p2p/host/blank/peer_connectedness_test.go diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index a26abc9b94..de6b8e5bae 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -198,11 +198,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil { return nil, err } - evtPeerConnectednessChanged, err := h.eventbus.Emitter(&event.EvtPeerConnectednessChanged{}) - if err != nil { - return nil, err - } - h.Network().Notify(newPeerConnectWatcher(evtPeerConnectednessChanged)) if !h.disableSignedPeerRecord { cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore()) diff --git a/p2p/host/basic/peer_connectedness.go b/p2p/host/basic/peer_connectedness.go deleted file mode 100644 index bfc46ed8f8..0000000000 --- a/p2p/host/basic/peer_connectedness.go +++ /dev/null @@ -1,71 +0,0 @@ -package basichost - -import ( - "sync" - - "github.com/libp2p/go-libp2p/core/event" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - - ma "github.com/multiformats/go-multiaddr" -) - -type peerConnectWatcher struct { - emitter event.Emitter - - mutex sync.Mutex - connected map[peer.ID]struct{} -} - -var _ network.Notifiee = &peerConnectWatcher{} - -func newPeerConnectWatcher(emitter event.Emitter) *peerConnectWatcher { - return &peerConnectWatcher{ - emitter: emitter, - connected: make(map[peer.ID]struct{}), - } -} - -func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {} -func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {} - -func (w *peerConnectWatcher) Connected(n network.Network, conn network.Conn) { - p := conn.RemotePeer() - w.handleTransition(p, n.Connectedness(p)) -} - -func (w *peerConnectWatcher) Disconnected(n network.Network, conn network.Conn) { - p := conn.RemotePeer() - w.handleTransition(p, n.Connectedness(p)) -} - -func (w *peerConnectWatcher) handleTransition(p peer.ID, state network.Connectedness) { - if changed := w.checkTransition(p, state); !changed { - return - } - w.emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: p, - Connectedness: state, - }) -} - -func (w *peerConnectWatcher) checkTransition(p peer.ID, state network.Connectedness) bool { - w.mutex.Lock() - defer w.mutex.Unlock() - switch state { - case network.Connected: - if _, ok := w.connected[p]; ok { - return false - } - w.connected[p] = struct{}{} - return true - case network.NotConnected: - if _, ok := w.connected[p]; ok { - delete(w.connected, p) - return true - } - return false - default: - return false - } -} diff --git a/p2p/host/basic/peer_connectedness_test.go b/p2p/host/basic/peer_connectedness_test.go deleted file mode 100644 index f124b10926..0000000000 --- a/p2p/host/basic/peer_connectedness_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package basichost - -import ( - "context" - "testing" - "time" - - "github.com/libp2p/go-libp2p/core/event" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" - - "github.com/stretchr/testify/require" -) - -func TestPeerConnectedness(t *testing.T) { - h1, err := NewHost(swarmt.GenSwarm(t), nil) - require.NoError(t, err) - defer h1.Close() - h2, err := NewHost(swarmt.GenSwarm(t), nil) - require.NoError(t, err) - - sub1, err := h1.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) - require.NoError(t, err) - defer sub1.Close() - sub2, err := h2.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) - require.NoError(t, err) - defer sub2.Close() - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})) - require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{ - Peer: h2.ID(), - Connectedness: network.Connected, - }) - require.Equal(t, (<-sub2.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{ - Peer: h1.ID(), - Connectedness: network.Connected, - }) - - // now close h2. This will disconnect it from h1. - require.NoError(t, h2.Close()) - require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{ - Peer: h2.ID(), - Connectedness: network.NotConnected, - }) -} diff --git a/p2p/host/blank/blank.go b/p2p/host/blank/blank.go index e264e5bb16..9f3daeff23 100644 --- a/p2p/host/blank/blank.go +++ b/p2p/host/blank/blank.go @@ -78,11 +78,6 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost { if bh.emitters.evtLocalProtocolsUpdated, err = bh.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil { return nil } - evtPeerConnectednessChanged, err := bh.eventbus.Emitter(&event.EvtPeerConnectednessChanged{}) - if err != nil { - return nil - } - n.Notify(newPeerConnectWatcher(evtPeerConnectednessChanged)) n.SetStreamHandler(bh.newStreamHandler) diff --git a/p2p/host/blank/peer_connectedness.go b/p2p/host/blank/peer_connectedness.go deleted file mode 100644 index 4f70540fca..0000000000 --- a/p2p/host/blank/peer_connectedness.go +++ /dev/null @@ -1,71 +0,0 @@ -package blankhost - -import ( - "sync" - - "github.com/libp2p/go-libp2p/core/event" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - - ma "github.com/multiformats/go-multiaddr" -) - -type peerConnectWatcher struct { - emitter event.Emitter - - mutex sync.Mutex - connected map[peer.ID]struct{} -} - -var _ network.Notifiee = &peerConnectWatcher{} - -func newPeerConnectWatcher(emitter event.Emitter) *peerConnectWatcher { - return &peerConnectWatcher{ - emitter: emitter, - connected: make(map[peer.ID]struct{}), - } -} - -func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {} -func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {} - -func (w *peerConnectWatcher) Connected(n network.Network, conn network.Conn) { - p := conn.RemotePeer() - w.handleTransition(p, n.Connectedness(p)) -} - -func (w *peerConnectWatcher) Disconnected(n network.Network, conn network.Conn) { - p := conn.RemotePeer() - w.handleTransition(p, n.Connectedness(p)) -} - -func (w *peerConnectWatcher) handleTransition(p peer.ID, state network.Connectedness) { - if changed := w.checkTransition(p, state); !changed { - return - } - w.emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: p, - Connectedness: state, - }) -} - -func (w *peerConnectWatcher) checkTransition(p peer.ID, state network.Connectedness) bool { - w.mutex.Lock() - defer w.mutex.Unlock() - switch state { - case network.Connected: - if _, ok := w.connected[p]; ok { - return false - } - w.connected[p] = struct{}{} - return true - case network.NotConnected: - if _, ok := w.connected[p]; ok { - delete(w.connected, p) - return true - } - return false - default: - return false - } -} diff --git a/p2p/host/blank/peer_connectedness_test.go b/p2p/host/blank/peer_connectedness_test.go deleted file mode 100644 index 260daa1e7d..0000000000 --- a/p2p/host/blank/peer_connectedness_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package blankhost - -import ( - "context" - "testing" - "time" - - "github.com/libp2p/go-libp2p/core/event" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" - - "github.com/stretchr/testify/require" -) - -func TestPeerConnectedness(t *testing.T) { - h1 := NewBlankHost(swarmt.GenSwarm(t)) - defer h1.Close() - h2 := NewBlankHost(swarmt.GenSwarm(t)) - - sub1, err := h1.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) - require.NoError(t, err) - defer sub1.Close() - sub2, err := h2.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) - require.NoError(t, err) - defer sub2.Close() - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})) - require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{ - Peer: h2.ID(), - Connectedness: network.Connected, - }) - require.Equal(t, (<-sub2.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{ - Peer: h1.ID(), - Connectedness: network.Connected, - }) - - // now close h2. This will disconnect it from h1. - require.NoError(t, h2.Close()) - require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{ - Peer: h2.ID(), - Connectedness: network.NotConnected, - }) -} From 7889a65ecefc1b1f3de706880dd1a1491c57cc6d Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 5 Feb 2023 17:17:18 +1300 Subject: [PATCH 5/5] swarm: emit connectedness events when holding the mutex --- p2p/net/swarm/swarm.go | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index beec6592b0..cd19e726ed 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -329,7 +329,12 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, } c.streams.m = make(map[*Stream]struct{}) - isFirstConn := len(s.conns.m[p]) == 0 + if len(s.conns.m[p]) == 0 { // first connection + s.emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: p, + Connectedness: network.Connected, + }) + } s.conns.m[p] = append(s.conns.m[p], c) // Add two swarm refs: @@ -347,13 +352,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, }) c.notifyLk.Unlock() - if isFirstConn { - s.emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: p, - Connectedness: network.Connected, - }) - } - c.start() return c, nil } @@ -627,14 +625,18 @@ func (s *Swarm) StopNotify(f network.Notifiee) { func (s *Swarm) removeConn(c *Conn) { p := c.RemotePeer() - var disconnected bool s.conns.Lock() + defer s.conns.Unlock() + cs := s.conns.m[p] for i, ci := range cs { if ci == c { if len(cs) == 1 { delete(s.conns.m, p) - disconnected = true + s.emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: p, + Connectedness: network.NotConnected, + }) } else { // NOTE: We're intentionally preserving order. // This way, connections to a peer are always @@ -646,14 +648,6 @@ func (s *Swarm) removeConn(c *Conn) { break } } - s.conns.Unlock() - - if disconnected { - s.emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: p, - Connectedness: network.NotConnected, - }) - } } // String returns a string representation of Network.