diff --git a/config/config.go b/config/config.go index d0a71664f7..ce8a2e420e 100644 --- a/config/config.go +++ b/config/config.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net" + "slices" "time" "github.com/libp2p/go-libp2p/core/connmgr" @@ -432,16 +433,6 @@ func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus) (*bhost.B if err != nil { return nil, err } - if cfg.Relay { - // If we've enabled the relay, we should filter out relay - // addresses by default. - // - // TODO: We shouldn't be doing this here. - originalAddrFactory := h.AddrsFactory - h.AddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { - return originalAddrFactory(autorelay.Filter(addrs)) - } - } return h, nil } @@ -514,17 +505,8 @@ func (cfg *Config) NewNode() (host.Host, error) { ) } - // originalAddrFactory is the AddrFactory before it's modified by autorelay - // we need this for checking reachability via autonat - originalAddrFactory := func(addrs []ma.Multiaddr) []ma.Multiaddr { - return addrs - } - // enable autorelay fxopts = append(fxopts, - fx.Invoke(func(h *bhost.BasicHost) { - originalAddrFactory = h.AddrsFactory - }), fx.Invoke(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) error { if cfg.EnableAutoRelay { if !cfg.DisableMetrics { @@ -561,7 +543,7 @@ func (cfg *Config) NewNode() (host.Host, error) { return nil, err } - if err := cfg.addAutoNAT(bh, originalAddrFactory); err != nil { + if err := cfg.addAutoNAT(bh); err != nil { app.Stop(context.Background()) if cfg.Routing != nil { rh.Close() @@ -577,11 +559,20 @@ func (cfg *Config) NewNode() (host.Host, error) { return &closableBasicHost{App: app, BasicHost: bh}, nil } -func (cfg *Config) addAutoNAT(h *bhost.BasicHost, addrF AddrsFactory) error { +func (cfg *Config) addAutoNAT(h *bhost.BasicHost) error { + // Only use public addresses for autonat + addrFunc := func() []ma.Multiaddr { + return slices.DeleteFunc(h.AllAddrs(), func(a ma.Multiaddr) bool { return !manet.IsPublicAddr(a) }) + } + if cfg.AddrsFactory != nil { + addrFunc = func() []ma.Multiaddr { + return slices.DeleteFunc( + cfg.AddrsFactory(h.AllAddrs()), + func(a ma.Multiaddr) bool { return !manet.IsPublicAddr(a) }) + } + } autonatOpts := []autonat.Option{ - autonat.UsingAddresses(func() []ma.Multiaddr { - return addrF(h.AllAddrs()) - }), + autonat.UsingAddresses(addrFunc), } if !cfg.DisableMetrics { autonatOpts = append(autonatOpts, autonat.WithMetricsTracer( @@ -664,7 +655,7 @@ func (cfg *Config) addAutoNAT(h *bhost.BasicHost, addrF AddrsFactory) error { autonat, err := autonat.New(h, autonatOpts...) if err != nil { - return fmt.Errorf("cannot enable autorelay; autonat failed to start: %v", err) + return fmt.Errorf("autonat init failed: %w", err) } h.SetAutoNat(autonat) return nil diff --git a/core/host/host.go b/core/host/host.go index 7990f7f456..0a8dbe4b0c 100644 --- a/core/host/host.go +++ b/core/host/host.go @@ -41,7 +41,7 @@ type Host interface { // given peer.ID. Connect will absorb the addresses in pi into its internal // peerstore. If there is not an active connection, Connect will issue a // h.Network.Dial, and block until a connection is open, or an error is - // returned. // TODO: Relay + NAT. + // returned. Connect(ctx context.Context, pi peer.AddrInfo) error // SetStreamHandler sets the protocol handler on the Host's Mux. diff --git a/p2p/host/autonat/autonat.go b/p2p/host/autonat/autonat.go index 479f31ecfb..ce009e79c7 100644 --- a/p2p/host/autonat/autonat.go +++ b/p2p/host/autonat/autonat.go @@ -40,11 +40,12 @@ type AmbientAutoNAT struct { // If it is <3, then multiple autoNAT peers may be contacted for dialback // If only a single autoNAT peer is known, then the confidence increases // for each failure until it reaches 3. - confidence int - lastInbound time.Time - lastProbeTry time.Time - lastProbe time.Time - recentProbes map[peer.ID]time.Time + confidence int + lastInbound time.Time + lastProbe time.Time + recentProbes map[peer.ID]time.Time + pendingProbes int + ourAddrs map[string]struct{} service *autoNATService @@ -70,7 +71,11 @@ func New(h host.Host, options ...Option) (AutoNAT, error) { return nil, err } if conf.addressFunc == nil { - conf.addressFunc = h.Addrs + if aa, ok := h.(interface{ AllAddrs() []ma.Multiaddr }); ok { + conf.addressFunc = aa.AllAddrs + } else { + conf.addressFunc = h.Addrs + } } for _, o := range options { @@ -112,6 +117,7 @@ func New(h host.Host, options ...Option) (AutoNAT, error) { emitReachabilityChanged: emitReachabilityChanged, service: service, recentProbes: make(map[peer.ID]time.Time), + ourAddrs: make(map[string]struct{}), } reachability := network.ReachabilityUnknown as.status.Store(&reachability) @@ -125,7 +131,6 @@ func New(h host.Host, options ...Option) (AutoNAT, error) { } as.subscriber = subscriber - h.Network().Notify(as) go as.background() return as, nil @@ -165,117 +170,126 @@ func (as *AmbientAutoNAT) background() { defer as.subscriber.Close() defer as.emitReachabilityChanged.Close() + // We want an update when our public non-relay listen addresses have changed. + // EvtLocalAddressesUpdated is a poor proxy for that. It works when the host is Public, + // but fails when the host is private and used with AutoRelay. + addrChangeTicker := time.NewTicker(1 * time.Minute) + defer addrChangeTicker.Stop() + timer := time.NewTimer(delay) defer timer.Stop() timerRunning := true - retryProbe := false + forceProbe := false for { select { - // new inbound connection. case conn := <-as.inboundConn: localAddrs := as.host.Addrs() if manet.IsPublicAddr(conn.RemoteMultiaddr()) && !ipInList(conn.RemoteMultiaddr(), localAddrs) { as.lastInbound = time.Now() } - + case <-addrChangeTicker.C: + // schedule a new probe if addresses have changed case e := <-subChan: switch e := e.(type) { - case event.EvtLocalAddressesUpdated: - // On local address update, reduce confidence from maximum so that we schedule - // the next probe sooner - if as.confidence == maxConfidence { - as.confidence-- - } case event.EvtPeerIdentificationCompleted: - if s, err := as.host.Peerstore().SupportsProtocols(e.Peer, AutoNATProto); err == nil && len(s) > 0 { - currentStatus := *as.status.Load() - if currentStatus == network.ReachabilityUnknown { - as.tryProbe(e.Peer) - } + if proto, err := as.host.Peerstore().SupportsProtocols(e.Peer, AutoNATProto); err == nil && len(proto) > 0 { + forceProbe = true } + case event.EvtLocalAddressesUpdated: + // schedule a new probe if addresses have changed default: log.Errorf("unknown event type: %T", e) } - - // probe finished. case err, ok := <-as.dialResponses: if !ok { return } + as.pendingProbes-- if IsDialRefused(err) { - retryProbe = true + forceProbe = true } else { as.handleDialResponse(err) } case <-timer.C: + timerRunning = false + forceProbe = false + // Update the last probe time. We use it to ensure + // that we don't spam the peerstore. + as.lastProbe = time.Now() peer := as.getPeerToProbe() as.tryProbe(peer) - timerRunning = false - retryProbe = false case <-as.ctx.Done(): return } + // On address update, reduce confidence from maximum so that we schedule + // the next probe sooner + hasNewAddr := as.checkAddrs() + if hasNewAddr && as.confidence == maxConfidence { + as.confidence-- + } - // Drain the timer channel if it hasn't fired in preparation for Resetting it. if timerRunning && !timer.Stop() { <-timer.C } - timer.Reset(as.scheduleProbe(retryProbe)) + timer.Reset(as.scheduleProbe(forceProbe)) timerRunning = true } } -func (as *AmbientAutoNAT) cleanupRecentProbes() { - fixedNow := time.Now() - for k, v := range as.recentProbes { - if fixedNow.Sub(v) > as.throttlePeerPeriod { - delete(as.recentProbes, k) +func (as *AmbientAutoNAT) checkAddrs() (hasNewAddr bool) { + currentAddrs := as.addressFunc() + for _, a := range currentAddrs { + if !manet.IsPublicAddr(a) { + continue + } + if _, ok := as.ourAddrs[string(a.Bytes())]; !ok { + hasNewAddr = true + break + } + } + clear(as.ourAddrs) + for _, a := range currentAddrs { + if !manet.IsPublicAddr(a) { + continue } + as.ourAddrs[string(a.Bytes())] = struct{}{} } + return false } // scheduleProbe calculates when the next probe should be scheduled for. -func (as *AmbientAutoNAT) scheduleProbe(retryProbe bool) time.Duration { - // Our baseline is a probe every 'AutoNATRefreshInterval' - // This is modulated by: - // * if we are in an unknown state, have low confidence, or we want to retry because a probe was refused that - // should drop to 'AutoNATRetryInterval' - // * recent inbound connections (implying continued connectivity) should decrease the retry when public - // * recent inbound connections when not public mean we should try more actively to see if we're public. - fixedNow := time.Now() +func (as *AmbientAutoNAT) scheduleProbe(forceProbe bool) time.Duration { + now := time.Now() currentStatus := *as.status.Load() - nextProbe := fixedNow - // Don't look for peers in the peer store more than once per second. - if !as.lastProbeTry.IsZero() { - backoff := as.lastProbeTry.Add(time.Second) - if backoff.After(nextProbe) { - nextProbe = backoff - } + nextProbeAfter := as.config.refreshInterval + if forceProbe && currentStatus == network.ReachabilityUnknown { + // retry very quicky if forceProbe is true *and* we don't know our reachability + // limit all peers fetch from peerstore to 1 per second. + nextProbeAfter = 2 * time.Second + } else if currentStatus == network.ReachabilityUnknown || + as.confidence < maxConfidence || + (currentStatus != network.ReachabilityPublic && as.lastInbound.After(as.lastProbe)) { + // Retry quickly in case: + // 1. Our reachability is Unknown + // 2. We don't have enough confidence in our reachability. + // 3. We're private but we received an inbound connection. + nextProbeAfter = as.config.retryInterval + } else if currentStatus == network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) { + // We are public and we received an inbound connection recently, + // wait a little longer + nextProbeAfter *= 2 } - if !as.lastProbe.IsZero() { - untilNext := as.config.refreshInterval - if retryProbe { - untilNext = as.config.retryInterval - } else if currentStatus == network.ReachabilityUnknown { - untilNext = as.config.retryInterval - } else if as.confidence < maxConfidence { - untilNext = as.config.retryInterval - } else if currentStatus == network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) { - untilNext *= 2 - } else if currentStatus != network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) { - untilNext /= 5 - } - - if as.lastProbe.Add(untilNext).After(nextProbe) { - nextProbe = as.lastProbe.Add(untilNext) - } + nextProbeTime := as.lastProbe.Add(nextProbeAfter) + if nextProbeTime.Before(now) { + nextProbeTime = now } if as.metricsTracer != nil { - as.metricsTracer.NextProbeTime(nextProbe) + as.metricsTracer.NextProbeTime(nextProbeTime) } - return nextProbe.Sub(fixedNow) + + return nextProbeTime.Sub(now) } // handleDialResponse updates the current status based on dial response. @@ -354,28 +368,14 @@ func (as *AmbientAutoNAT) recordObservation(observation network.Reachability) { } } -func (as *AmbientAutoNAT) tryProbe(p peer.ID) bool { - as.lastProbeTry = time.Now() - if p.Validate() != nil { - return false - } - - if lastTime, ok := as.recentProbes[p]; ok { - if time.Since(lastTime) < as.throttlePeerPeriod { - return false - } +func (as *AmbientAutoNAT) tryProbe(p peer.ID) { + if p == "" || as.pendingProbes > 5 { + return } - as.cleanupRecentProbes() - info := as.host.Peerstore().PeerInfo(p) - - if !as.config.dialPolicy.skipPeer(info.Addrs) { - as.recentProbes[p] = time.Now() - as.lastProbe = time.Now() - go as.probe(&info) - return true - } - return false + as.recentProbes[p] = time.Now() + as.pendingProbes++ + go as.probe(&info) } func (as *AmbientAutoNAT) probe(pi *peer.AddrInfo) { @@ -399,33 +399,30 @@ func (as *AmbientAutoNAT) getPeerToProbe() peer.ID { return "" } - candidates := make([]peer.ID, 0, len(peers)) + // clean old probes + fixedNow := time.Now() + for k, v := range as.recentProbes { + if fixedNow.Sub(v) > as.throttlePeerPeriod { + delete(as.recentProbes, k) + } + } - for _, p := range peers { + n := len(peers) + for i, j := rand.Intn(n), 0; j < n; i, j = (i+1)%n, j+1 { + p := peers[i] info := as.host.Peerstore().PeerInfo(p) // Exclude peers which don't support the autonat protocol. if proto, err := as.host.Peerstore().SupportsProtocols(p, AutoNATProto); len(proto) == 0 || err != nil { continue } - // Exclude peers in backoff. - if lastTime, ok := as.recentProbes[p]; ok { - if time.Since(lastTime) < as.throttlePeerPeriod { - continue - } - } - if as.config.dialPolicy.skipPeer(info.Addrs) { continue } - candidates = append(candidates, p) - } - - if len(candidates) == 0 { - return "" + return p } - return candidates[rand.Intn(len(candidates))] + return "" } func (as *AmbientAutoNAT) Close() error { diff --git a/p2p/host/autonat/autonat_test.go b/p2p/host/autonat/autonat_test.go index d8cfcc51e4..bb52dc84ab 100644 --- a/p2p/host/autonat/autonat_test.go +++ b/p2p/host/autonat/autonat_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p-testing/race" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -223,7 +224,7 @@ func TestAutoNATIncomingEvents(t *testing.T) { require.Eventually(t, func() bool { return an.Status() != network.ReachabilityUnknown - }, 500*time.Millisecond, 10*time.Millisecond, "Expected probe due to identification of autonat service") + }, 5*time.Second, 100*time.Millisecond, "Expected probe due to identification of autonat service") } func TestAutoNATDialRefused(t *testing.T) { @@ -259,6 +260,9 @@ func TestAutoNATDialRefused(t *testing.T) { } func TestAutoNATObservationRecording(t *testing.T) { + if race.WithRace() { + t.Skip("recordObservation modifies internal state accessed from other goroutine") + } hs := makeAutoNATServicePublic(t) defer hs.Close() hc, ani := makeAutoNAT(t, hs) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index 5900798533..b31302098d 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -29,8 +29,7 @@ type AutoRelay struct { relayFinder *relayFinder - host host.Host - addrsF basic.AddrsFactory + host host.Host metricsTracer MetricsTracer } @@ -38,7 +37,6 @@ type AutoRelay struct { func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) { r := &AutoRelay{ host: bhost, - addrsF: bhost.AddrsFactory, status: network.ReachabilityUnknown, } conf := defaultConfig @@ -51,7 +49,22 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) { r.conf = &conf r.relayFinder = newRelayFinder(bhost, conf.peerSource, &conf) r.metricsTracer = &wrappedMetricsTracer{conf.metricsTracer} - bhost.AddrsFactory = r.hostAddrs + + // Update the host address factory to use autorelay addresses if we're private + // + // TODO: Don't update host address factory. Instead send our relay addresses on the eventbus. + // The host can decide how to handle those. + addrF := bhost.AddrsFactory + bhost.AddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { + addrs = addrF(addrs) + r.mx.Lock() + defer r.mx.Unlock() + + if r.status != network.ReachabilityPrivate { + return addrs + } + return r.relayFinder.relayAddrs(addrs) + } return r, nil } @@ -103,20 +116,6 @@ func (r *AutoRelay) background() { } } -func (r *AutoRelay) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - return r.relayAddrs(r.addrsF(addrs)) -} - -func (r *AutoRelay) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - r.mx.Lock() - defer r.mx.Unlock() - - if r.status != network.ReachabilityPrivate { - return addrs - } - return r.relayFinder.relayAddrs(addrs) -} - func (r *AutoRelay) Close() error { r.ctxCancel() err := r.relayFinder.Stop() diff --git a/p2p/host/autorelay/relay.go b/p2p/host/autorelay/relay.go index db0d97ec01..2ae5bf240c 100644 --- a/p2p/host/autorelay/relay.go +++ b/p2p/host/autorelay/relay.go @@ -5,6 +5,8 @@ import ( ) // Filter filters out all relay addresses. +// +// Deprecated: It is trivial for a user to implement this if they need this. func Filter(addrs []ma.Multiaddr) []ma.Multiaddr { raddrs := make([]ma.Multiaddr, 0, len(addrs)) for _, addr := range addrs { diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index b5d252e9d2..e0106f82af 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -822,11 +822,11 @@ func (h *BasicHost) ConnManager() connmgr.ConnManager { return h.cmgr } -// Addrs returns listening addresses that are safe to announce to the network. -// The output is the same as AllAddrs, but processed by AddrsFactory. +// Addrs returns listening addresses. The output is the same as AllAddrs, but +// processed by AddrsFactory. +// When used with AutoRelay, and if the host is not publicly reachable, +// this will only have host's private, relay, and no public addresses. func (h *BasicHost) Addrs() []ma.Multiaddr { - // We don't need to append certhashes here, the user provided addrsFactory was - // wrapped with addCertHashes in the constructor. addrs := h.AddrsFactory(h.AllAddrs()) // Make a copy. Consumers can modify the slice elements res := make([]ma.Multiaddr, len(addrs)) @@ -851,9 +851,9 @@ func (h *BasicHost) NormalizeMultiaddr(addr ma.Multiaddr) ma.Multiaddr { return addr } +var p2pCircuitAddr = ma.StringCast("/p2p-circuit") + // AllAddrs returns all the addresses the host is listening on except circuit addresses. -// The output has webtransport addresses inferred from quic addresses. -// All the addresses have the correct func (h *BasicHost) AllAddrs() []ma.Multiaddr { listenAddrs := h.Network().ListenAddresses() if len(listenAddrs) == 0 { @@ -946,6 +946,16 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr { finalAddrs = append(finalAddrs, observedAddrs...) } finalAddrs = ma.Unique(finalAddrs) + // Remove /p2p-circuit addresses from the list. + // The p2p-circuit tranport listener reports its address as just /p2p-circuit + // This is useless for dialing. Users need to manage their circuit addresses themselves, + // or use AutoRelay. + finalAddrs = slices.DeleteFunc(finalAddrs, func(a ma.Multiaddr) bool { + return a.Equal(p2pCircuitAddr) + }) + // Add certhashes for /webrtc-direct, /webtransport, etc addresses discovered + // using identify. + finalAddrs = h.addCertHashes(finalAddrs) return finalAddrs }