Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: emit PeerConnectedness event from swarm instead of from hosts #1574

Merged
merged 5 commits into from
Feb 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
Expand Down Expand Up @@ -123,7 +125,7 @@ type Config struct {
PrometheusRegisterer prometheus.Registerer
}

func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) {
func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
if cfg.Peerstore == nil {
return nil, fmt.Errorf("no peerstore specified")
}
Expand Down Expand Up @@ -176,7 +178,7 @@ func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) {
swarm.WithMetricsTracer(swarm.NewMetricsTracer(swarm.WithRegisterer(cfg.PrometheusRegisterer))))
}
// TODO: Make the swarm implementation configurable.
return swarm.NewSwarm(pid, cfg.Peerstore, opts...)
return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...)
}

func (cfg *Config) addTransports(h host.Host) error {
Expand Down Expand Up @@ -284,12 +286,14 @@ func (cfg *Config) addTransports(h host.Host) error {
//
// This function consumes the config. Do not reuse it (really!).
func (cfg *Config) NewNode() (host.Host, error) {
swrm, err := cfg.makeSwarm(!cfg.DisableMetrics)
eventBus := eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer))))
swrm, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
if err != nil {
return nil, err
}

h, err := bhost.NewHost(swrm, &bhost.HostOpts{
EventBus: eventBus,
ConnManager: cfg.ConnManager,
AddrsFactory: cfg.AddrsFactory,
NATManager: cfg.NATManager,
Expand Down Expand Up @@ -397,7 +401,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
Peerstore: ps,
}

dialer, err := autoNatCfg.makeSwarm(false)
dialer, err := autoNatCfg.makeSwarm(eventbus.NewBus(), false)
if err != nil {
h.Close()
return nil, err
Expand Down
22 changes: 7 additions & 15 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/libp2p/go-netroute"

logging "github.com/ipfs/go-log/v2"

ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr/net"
Expand Down Expand Up @@ -108,6 +107,9 @@ var _ host.Host = (*BasicHost)(nil)
// HostOpts holds options that can be passed to NewHost in order to
// customize construction of the *BasicHost.
type HostOpts struct {
// EventBus sets the event bus. Will construct a new event bus if omitted.
Copy link
Contributor

@vyzo vyzo May 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel uneasy about this.
Maybe we should add the EventBus method to the Network interface and forward from the host so that we cant end up with two of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really how this could happen, but we can make this change.

Can we defer this until we consolidated all the other repos as well (#1566)? It's happening in the next release.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would be to just make this entire thing private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of having both a Config and HostOpts. I expect us to merge these two (or refactor them) in some other way when we start using Fx for service construction (#1993). I suggest not making any big changes in this PR.

EventBus event.Bus

// MultistreamMuxer is essential for the *BasicHost and will use a sensible default value if omitted.
MultistreamMuxer *msmux.MultistreamMuxer[protocol.ID]

Expand Down Expand Up @@ -164,16 +166,11 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
if opts == nil {
opts = &HostOpts{}
}

var eventBus event.Bus
if opts.EnableMetrics {
eventBus = eventbus.NewBus(
eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(opts.PrometheusRegisterer))))
} else {
eventBus = eventbus.NewBus()
if opts.EventBus == nil {
opts.EventBus = eventbus.NewBus()
}

psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), eventBus)
psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), opts.EventBus)
if err != nil {
return nil, err
}
Expand All @@ -186,7 +183,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
negtimeout: DefaultNegotiationTimeout,
AddrsFactory: DefaultAddrsFactory,
maResolver: madns.DefaultResolver,
eventbus: eventBus,
eventbus: opts.EventBus,
addrChangeChan: make(chan struct{}, 1),
ctx: hostCtx,
ctxCancel: cancel,
Expand All @@ -201,11 +198,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil {
return nil, err
}
evtPeerConnectednessChanged, err := h.eventbus.Emitter(&event.EvtPeerConnectednessChanged{})
if err != nil {
return nil, err
}
h.Network().Notify(newPeerConnectWatcher(evtPeerConnectednessChanged))

if !h.disableSignedPeerRecord {
cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore())
Expand Down
71 changes: 0 additions & 71 deletions p2p/host/basic/peer_connectedness.go

This file was deleted.

48 changes: 0 additions & 48 deletions p2p/host/basic/peer_connectedness_test.go

This file was deleted.

5 changes: 0 additions & 5 deletions p2p/host/blank/blank.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost {
if bh.emitters.evtLocalProtocolsUpdated, err = bh.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
return nil
}
evtPeerConnectednessChanged, err := bh.eventbus.Emitter(&event.EvtPeerConnectednessChanged{})
if err != nil {
return nil
}
n.Notify(newPeerConnectWatcher(evtPeerConnectednessChanged))

n.SetStreamHandler(bh.newStreamHandler)

Expand Down
71 changes: 0 additions & 71 deletions p2p/host/blank/peer_connectedness.go

This file was deleted.

46 changes: 0 additions & 46 deletions p2p/host/blank/peer_connectedness_test.go

This file was deleted.

3 changes: 2 additions & 1 deletion p2p/net/swarm/dial_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p/core/sec"
"github.com/libp2p/go-libp2p/core/sec/insecure"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
Expand Down Expand Up @@ -43,7 +44,7 @@ func makeSwarm(t *testing.T) *Swarm {
ps.AddPrivKey(id, priv)
t.Cleanup(func() { ps.Close() })

s, err := NewSwarm(id, ps, WithDialTimeout(time.Second))
s, err := NewSwarm(id, ps, eventbus.NewBus(), WithDialTimeout(time.Second))
require.NoError(t, err)

upgrader := makeUpgrader(t, s)
Expand Down
Loading