diff --git a/build/version.go b/build/version.go index e07fdd58a9..f0face37e2 100644 --- a/build/version.go +++ b/build/version.go @@ -47,7 +47,7 @@ const ( // AppPreRelease MUST only contain characters from semanticAlphabet per // the semantic versioning spec. - AppPreRelease = "beta.rc1" + AppPreRelease = "beta.rc2" ) func init() { diff --git a/channeldb/error.go b/channeldb/error.go index 859af97464..629cd93c6f 100644 --- a/channeldb/error.go +++ b/channeldb/error.go @@ -43,6 +43,10 @@ var ( // created. ErrMetaNotFound = fmt.Errorf("unable to locate meta information") + // ErrClosedScidsNotFound is returned when the closed scid bucket + // hasn't been created. + ErrClosedScidsNotFound = fmt.Errorf("closed scid bucket doesn't exist") + // ErrGraphNotFound is returned when at least one of the components of // graph doesn't exist. ErrGraphNotFound = fmt.Errorf("graph bucket not initialized") diff --git a/channeldb/graph.go b/channeldb/graph.go index 4146721660..86cbe9aa93 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -153,6 +153,14 @@ var ( // case we'll remove all entries from the prune log with a block height // that no longer exists. pruneLogBucket = []byte("prune-log") + + // closedScidBucket is a top-level bucket that stores scids for + // channels that we know to be closed. This is used so that we don't + // need to perform expensive validation checks if we receive a channel + // announcement for the channel again. + // + // maps: scid -> []byte{} + closedScidBucket = []byte("closed-scid") ) const ( @@ -318,6 +326,7 @@ var graphTopLevelBuckets = [][]byte{ nodeBucket, edgeBucket, graphMetaBucket, + closedScidBucket, } // Wipe completely deletes all saved state within all used buckets within the @@ -2143,6 +2152,21 @@ func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo, zombieIndex, scid, ) + // TODO(ziggie): Make sure that for the strict + // pruning case we compare the pubkeys and + // whether the right timestamp is not older than + // the `ChannelPruneExpiry`. + // + // NOTE: The timestamp data has no verification + // attached to it in the `ReplyChannelRange` msg + // so we are trusting this data at this point. + // However it is not critical because we are + // just removing the channel from the db when + // the timestamps are more recent. During the + // querying of the gossip msg verification + // happens as usual. + // However we should start punishing peers when + // they don't provide us honest data ? isStillZombie := isZombieChan( info.Node1UpdateTimestamp, info.Node2UpdateTimestamp, @@ -2211,6 +2235,29 @@ type ChannelUpdateInfo struct { Node2UpdateTimestamp time.Time } +// NewChannelUpdateInfo is a constructor which makes sure we initialize the +// timestamps with zero seconds unix timestamp which equals +// `January 1, 1970, 00:00:00 UTC` in case the value is `time.Time{}`. +func NewChannelUpdateInfo(scid lnwire.ShortChannelID, node1Timestamp, + node2Timestamp time.Time) ChannelUpdateInfo { + + chanInfo := ChannelUpdateInfo{ + ShortChannelID: scid, + Node1UpdateTimestamp: node1Timestamp, + Node2UpdateTimestamp: node2Timestamp, + } + + if node1Timestamp.IsZero() { + chanInfo.Node1UpdateTimestamp = time.Unix(0, 0) + } + + if node2Timestamp.IsZero() { + chanInfo.Node2UpdateTimestamp = time.Unix(0, 0) + } + + return chanInfo +} + // BlockChannelRange represents a range of channels for a given block height. type BlockChannelRange struct { // Height is the height of the block all of the channels below were @@ -2284,9 +2331,9 @@ func (c *ChannelGraph) FilterChannelRange(startHeight, rawCid := byteOrder.Uint64(k) cid := lnwire.NewShortChanIDFromInt(rawCid) - chanInfo := ChannelUpdateInfo{ - ShortChannelID: cid, - } + chanInfo := NewChannelUpdateInfo( + cid, time.Time{}, time.Time{}, + ) if !withTimestamps { channelsPerBlock[cid.BlockHeight] = append( @@ -3846,6 +3893,53 @@ func (c *ChannelGraph) NumZombies() (uint64, error) { return numZombies, nil } +// PutClosedScid stores a SCID for a closed channel in the database. This is so +// that we can ignore channel announcements that we know to be closed without +// having to validate them and fetch a block. +func (c *ChannelGraph) PutClosedScid(scid lnwire.ShortChannelID) error { + return kvdb.Update(c.db, func(tx kvdb.RwTx) error { + closedScids, err := tx.CreateTopLevelBucket(closedScidBucket) + if err != nil { + return err + } + + var k [8]byte + byteOrder.PutUint64(k[:], scid.ToUint64()) + + return closedScids.Put(k[:], []byte{}) + }, func() {}) +} + +// IsClosedScid checks whether a channel identified by the passed in scid is +// closed. This helps avoid having to perform expensive validation checks. +// TODO: Add an LRU cache to cut down on disc reads. +func (c *ChannelGraph) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) { + var isClosed bool + err := kvdb.View(c.db, func(tx kvdb.RTx) error { + closedScids := tx.ReadBucket(closedScidBucket) + if closedScids == nil { + return ErrClosedScidsNotFound + } + + var k [8]byte + byteOrder.PutUint64(k[:], scid.ToUint64()) + + if closedScids.Get(k[:]) != nil { + isClosed = true + return nil + } + + return nil + }, func() { + isClosed = false + }) + if err != nil { + return false, err + } + + return isClosed, nil +} + func putLightningNode(nodeBucket kvdb.RwBucket, aliasBucket kvdb.RwBucket, // nolint:dupl updateIndex kvdb.RwBucket, node *LightningNode) error { diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 00e30d5b24..f45bc307b2 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -1980,9 +1980,9 @@ func TestFilterKnownChanIDs(t *testing.T) { t.Fatalf("unable to create channel edge: %v", err) } - chanIDs = append(chanIDs, ChannelUpdateInfo{ - ShortChannelID: chanID, - }) + chanIDs = append(chanIDs, NewChannelUpdateInfo( + chanID, time.Time{}, time.Time{}, + )) } const numZombies = 5 @@ -2024,20 +2024,28 @@ func TestFilterKnownChanIDs(t *testing.T) { // should get the same set back. { queryIDs: []ChannelUpdateInfo{ - {ShortChannelID: lnwire.ShortChannelID{ - BlockHeight: 99, - }}, - {ShortChannelID: lnwire.ShortChannelID{ - BlockHeight: 100, - }}, + { + ShortChannelID: lnwire.ShortChannelID{ + BlockHeight: 99, + }, + }, + { + ShortChannelID: lnwire.ShortChannelID{ + BlockHeight: 100, + }, + }, }, resp: []ChannelUpdateInfo{ - {ShortChannelID: lnwire.ShortChannelID{ - BlockHeight: 99, - }}, - {ShortChannelID: lnwire.ShortChannelID{ - BlockHeight: 100, - }}, + { + ShortChannelID: lnwire.ShortChannelID{ + BlockHeight: 99, + }, + }, + { + ShortChannelID: lnwire.ShortChannelID{ + BlockHeight: 100, + }, + }, }, }, @@ -2419,7 +2427,7 @@ func TestFilterChannelRange(t *testing.T) { ) ) - updateTimeSeed := int64(1) + updateTimeSeed := time.Now().Unix() maybeAddPolicy := func(chanID uint64, node *LightningNode, node2 bool) time.Time { @@ -2428,7 +2436,7 @@ func TestFilterChannelRange(t *testing.T) { chanFlags = lnwire.ChanUpdateDirection } - var updateTime time.Time + var updateTime = time.Unix(0, 0) if rand.Int31n(2) == 0 { updateTime = time.Unix(updateTimeSeed, 0) err = graph.UpdateEdgePolicy(&models.ChannelEdgePolicy{ @@ -2456,11 +2464,16 @@ func TestFilterChannelRange(t *testing.T) { ) require.NoError(t, graph.AddChannelEdge(&channel2)) + chanInfo1 := NewChannelUpdateInfo( + chanID1, time.Time{}, time.Time{}, + ) + chanInfo2 := NewChannelUpdateInfo( + chanID2, time.Time{}, time.Time{}, + ) channelRanges = append(channelRanges, BlockChannelRange{ Height: chanHeight, Channels: []ChannelUpdateInfo{ - {ShortChannelID: chanID1}, - {ShortChannelID: chanID2}, + chanInfo1, chanInfo2, }, }) @@ -2471,20 +2484,17 @@ func TestFilterChannelRange(t *testing.T) { time4 = maybeAddPolicy(channel2.ChannelID, node2, true) ) + chanInfo1 = NewChannelUpdateInfo( + chanID1, time1, time2, + ) + chanInfo2 = NewChannelUpdateInfo( + chanID2, time3, time4, + ) channelRangesWithTimestamps = append( channelRangesWithTimestamps, BlockChannelRange{ Height: chanHeight, Channels: []ChannelUpdateInfo{ - { - ShortChannelID: chanID1, - Node1UpdateTimestamp: time1, - Node2UpdateTimestamp: time2, - }, - { - ShortChannelID: chanID2, - Node1UpdateTimestamp: time3, - Node2UpdateTimestamp: time4, - }, + chanInfo1, chanInfo2, }, }, ) @@ -4027,3 +4037,28 @@ func TestGraphLoading(t *testing.T) { graphReloaded.graphCache.nodeFeatures, ) } + +// TestClosedScid tests that we can correctly insert a SCID into the index of +// closed short channel ids. +func TestClosedScid(t *testing.T) { + t.Parallel() + + graph, err := MakeTestGraph(t) + require.Nil(t, err) + + scid := lnwire.ShortChannelID{} + + // The scid should not exist in the closedScidBucket. + exists, err := graph.IsClosedScid(scid) + require.Nil(t, err) + require.False(t, exists) + + // After we call PutClosedScid, the call to IsClosedScid should return + // true. + err = graph.PutClosedScid(scid) + require.Nil(t, err) + + exists, err = graph.IsClosedScid(scid) + require.Nil(t, err) + require.True(t, exists) +} diff --git a/discovery/ban.go b/discovery/ban.go new file mode 100644 index 0000000000..cd70d7c381 --- /dev/null +++ b/discovery/ban.go @@ -0,0 +1,252 @@ +package discovery + +import ( + "errors" + "sync" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/lightninglabs/neutrino/cache" + "github.com/lightninglabs/neutrino/cache/lru" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" +) + +const ( + // maxBannedPeers limits the maximum number of banned pubkeys that + // we'll store. + // TODO(eugene): tune. + maxBannedPeers = 10_000 + + // banThreshold is the point at which non-channel peers will be banned. + // TODO(eugene): tune. + banThreshold = 100 + + // banTime is the amount of time that the non-channel peer will be + // banned for. Channel announcements from channel peers will be dropped + // if it's not one of our channels. + // TODO(eugene): tune. + banTime = time.Hour * 48 + + // resetDelta is the time after a peer's last ban update that we'll + // reset its ban score. + // TODO(eugene): tune. + resetDelta = time.Hour * 48 + + // purgeInterval is how often we'll remove entries from the + // peerBanIndex and allow peers to be un-banned. This interval is also + // used to reset ban scores of peers that aren't banned. + purgeInterval = time.Minute * 10 +) + +var ErrPeerBanned = errors.New("peer has bypassed ban threshold - banning") + +// ClosedChannelTracker handles closed channels being gossiped to us. +type ClosedChannelTracker interface { + // GraphCloser is used to mark channels as closed and to check whether + // certain channels are closed. + GraphCloser + + // IsChannelPeer checks whether we have a channel with a peer. + IsChannelPeer(*btcec.PublicKey) (bool, error) +} + +// GraphCloser handles tracking closed channels by their scid. +type GraphCloser interface { + // PutClosedScid marks a channel as closed so that we won't validate + // channel announcements for it again. + PutClosedScid(lnwire.ShortChannelID) error + + // IsClosedScid checks if a short channel id is closed. + IsClosedScid(lnwire.ShortChannelID) (bool, error) +} + +// NodeInfoInquirier handles queries relating to specific nodes and channels +// they may have with us. +type NodeInfoInquirer interface { + // FetchOpenChannels returns the set of channels that we have with the + // peer identified by the passed-in public key. + FetchOpenChannels(*btcec.PublicKey) ([]*channeldb.OpenChannel, error) +} + +// ScidCloserMan helps the gossiper handle closed channels that are in the +// ChannelGraph. +type ScidCloserMan struct { + graph GraphCloser + channelDB NodeInfoInquirer +} + +// NewScidCloserMan creates a new ScidCloserMan. +func NewScidCloserMan(graph GraphCloser, + channelDB NodeInfoInquirer) *ScidCloserMan { + + return &ScidCloserMan{ + graph: graph, + channelDB: channelDB, + } +} + +// PutClosedScid marks scid as closed so the gossiper can ignore this channel +// in the future. +func (s *ScidCloserMan) PutClosedScid(scid lnwire.ShortChannelID) error { + return s.graph.PutClosedScid(scid) +} + +// IsClosedScid checks whether scid is closed so that the gossiper can ignore +// it. +func (s *ScidCloserMan) IsClosedScid(scid lnwire.ShortChannelID) (bool, + error) { + + return s.graph.IsClosedScid(scid) +} + +// IsChannelPeer checks whether we have a channel with the peer. +func (s *ScidCloserMan) IsChannelPeer(peerKey *btcec.PublicKey) (bool, error) { + chans, err := s.channelDB.FetchOpenChannels(peerKey) + if err != nil { + return false, err + } + + return len(chans) > 0, nil +} + +// A compile-time constraint to ensure ScidCloserMan implements +// ClosedChannelTracker. +var _ ClosedChannelTracker = (*ScidCloserMan)(nil) + +// cachedBanInfo is used to track a peer's ban score and if it is banned. +type cachedBanInfo struct { + score uint64 + lastUpdate time.Time +} + +// Size returns the "size" of an entry. +func (c *cachedBanInfo) Size() (uint64, error) { + return 1, nil +} + +// isBanned returns true if the ban score is greater than the ban threshold. +func (c *cachedBanInfo) isBanned() bool { + return c.score >= banThreshold +} + +// banman is responsible for banning peers that are misbehaving. The banman is +// in-memory and will be reset upon restart of LND. If a node's pubkey is in +// the peerBanIndex, it has a ban score. Ban scores start at 1 and are +// incremented by 1 for each instance of misbehavior. It uses an LRU cache to +// cut down on memory usage in case there are many banned peers and to protect +// against DoS. +type banman struct { + // peerBanIndex tracks our peers' ban scores and if they are banned and + // for how long. The ban score is incremented when our peer gives us + // gossip messages that are invalid. + peerBanIndex *lru.Cache[[33]byte, *cachedBanInfo] + + wg sync.WaitGroup + quit chan struct{} +} + +// newBanman creates a new banman with the default maxBannedPeers. +func newBanman() *banman { + return &banman{ + peerBanIndex: lru.NewCache[[33]byte, *cachedBanInfo]( + maxBannedPeers, + ), + quit: make(chan struct{}), + } +} + +// start kicks off the banman by calling purgeExpiredBans. +func (b *banman) start() { + b.wg.Add(1) + go b.purgeExpiredBans() +} + +// stop halts the banman. +func (b *banman) stop() { + close(b.quit) + b.wg.Wait() +} + +// purgeOldEntries removes ban entries if their ban has expired. +func (b *banman) purgeExpiredBans() { + defer b.wg.Done() + + purgeTicker := time.NewTicker(purgeInterval) + defer purgeTicker.Stop() + + for { + select { + case <-purgeTicker.C: + b.purgeBanEntries() + + case <-b.quit: + return + } + } +} + +// purgeBanEntries does two things: +// - removes peers from our ban list whose ban timer is up +// - removes peers whose ban scores have expired. +func (b *banman) purgeBanEntries() { + keysToRemove := make([][33]byte, 0) + + sweepEntries := func(pubkey [33]byte, banInfo *cachedBanInfo) bool { + if banInfo.isBanned() { + // If the peer is banned, check if the ban timer has + // expired. + if banInfo.lastUpdate.Add(banTime).Before(time.Now()) { + keysToRemove = append(keysToRemove, pubkey) + } + + return true + } + + if banInfo.lastUpdate.Add(resetDelta).Before(time.Now()) { + // Remove non-banned peers whose ban scores have + // expired. + keysToRemove = append(keysToRemove, pubkey) + } + + return true + } + + b.peerBanIndex.Range(sweepEntries) + + for _, key := range keysToRemove { + b.peerBanIndex.Delete(key) + } +} + +// isBanned checks whether the peer identified by the pubkey is banned. +func (b *banman) isBanned(pubkey [33]byte) bool { + banInfo, err := b.peerBanIndex.Get(pubkey) + switch { + case errors.Is(err, cache.ErrElementNotFound): + return false + + default: + return banInfo.isBanned() + } +} + +// incrementBanScore increments a peer's ban score. +func (b *banman) incrementBanScore(pubkey [33]byte) { + banInfo, err := b.peerBanIndex.Get(pubkey) + switch { + case errors.Is(err, cache.ErrElementNotFound): + cachedInfo := &cachedBanInfo{ + score: 1, + lastUpdate: time.Now(), + } + _, _ = b.peerBanIndex.Put(pubkey, cachedInfo) + default: + cachedInfo := &cachedBanInfo{ + score: banInfo.score + 1, + lastUpdate: time.Now(), + } + + _, _ = b.peerBanIndex.Put(pubkey, cachedInfo) + } +} diff --git a/discovery/ban_test.go b/discovery/ban_test.go new file mode 100644 index 0000000000..e4149028b2 --- /dev/null +++ b/discovery/ban_test.go @@ -0,0 +1,60 @@ +package discovery + +import ( + "testing" + "time" + + "github.com/lightninglabs/neutrino/cache" + "github.com/stretchr/testify/require" +) + +// TestPurgeBanEntries tests that we properly purge ban entries on a timer. +func TestPurgeBanEntries(t *testing.T) { + t.Parallel() + + b := newBanman() + + // Ban a peer by repeatedly incrementing its ban score. + peer1 := [33]byte{0x00} + + for i := 0; i < banThreshold; i++ { + b.incrementBanScore(peer1) + } + + // Assert that the peer is now banned. + require.True(t, b.isBanned(peer1)) + + // A call to purgeBanEntries should not remove the peer from the index. + b.purgeBanEntries() + require.True(t, b.isBanned(peer1)) + + // Now set the peer's last update time to two banTimes in the past so + // that we can assert that purgeBanEntries does remove it from the + // index. + banInfo, err := b.peerBanIndex.Get(peer1) + require.NoError(t, err) + + banInfo.lastUpdate = time.Now().Add(-2 * banTime) + + b.purgeBanEntries() + _, err = b.peerBanIndex.Get(peer1) + require.ErrorIs(t, err, cache.ErrElementNotFound) + + // Increment the peer's ban score again but don't get it banned. + b.incrementBanScore(peer1) + require.False(t, b.isBanned(peer1)) + + // Assert that purgeBanEntries does nothing. + b.purgeBanEntries() + banInfo, err = b.peerBanIndex.Get(peer1) + require.Nil(t, err) + + // Set its lastUpdate time to 2 resetDelta's in the past so that + // purgeBanEntries removes it. + banInfo.lastUpdate = time.Now().Add(-2 * resetDelta) + + b.purgeBanEntries() + + _, err = b.peerBanIndex.Get(peer1) + require.ErrorIs(t, err, cache.ErrElementNotFound) +} diff --git a/discovery/gossiper.go b/discovery/gossiper.go index bb0aa652c4..84fae767f0 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -256,6 +256,11 @@ type Config struct { // here? AnnSigner lnwallet.MessageSigner + // ScidCloser is an instance of ClosedChannelTracker that helps the + // gossiper cut down on spam channel announcements for already closed + // channels. + ScidCloser ClosedChannelTracker + // NumActiveSyncers is the number of peers for which we should have // active syncers with. After reaching NumActiveSyncers, any future // gossip syncers will be passive. @@ -434,6 +439,9 @@ type AuthenticatedGossiper struct { // ChannelAnnouncement for the channel is received. prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg] + // banman tracks our peer's ban status. + banman *banman + // networkMsgs is a channel that carries new network broadcasted // message from outside the gossiper service to be processed by the // networkHandler. @@ -512,6 +520,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper maxRejectedUpdates, ), chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter), + banman: newBanman(), } gossiper.syncMgr = newSyncManager(&SyncManagerCfg{ @@ -606,6 +615,8 @@ func (d *AuthenticatedGossiper) start() error { d.syncMgr.Start() + d.banman.start() + // Start receiving blocks in its dedicated goroutine. d.wg.Add(2) go d.syncBlockHeight() @@ -762,6 +773,8 @@ func (d *AuthenticatedGossiper) stop() { d.syncMgr.Stop() + d.banman.stop() + close(d.quit) d.wg.Wait() @@ -2232,7 +2245,7 @@ func (d *AuthenticatedGossiper) updateChannel(info *models.ChannelEdgeInfo, BitcoinKey1: info.BitcoinKey1Bytes, Features: lnwire.NewRawFeatureVector(), BitcoinKey2: info.BitcoinKey2Bytes, - ExtraOpaqueData: edge.ExtraOpaqueData, + ExtraOpaqueData: info.ExtraOpaqueData, } chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature( info.AuthProof.NodeSig1Bytes, @@ -2399,8 +2412,10 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, ann *lnwire.ChannelAnnouncement, ops []batch.SchedulerOption) ([]networkMsg, bool) { + scid := ann.ShortChannelID + log.Debugf("Processing ChannelAnnouncement: peer=%v, short_chan_id=%v", - nMsg.peer, ann.ShortChannelID.ToUint64()) + nMsg.peer, scid.ToUint64()) // We'll ignore any channel announcements that target any chain other // than the set of chains we know of. @@ -2411,7 +2426,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, log.Errorf(err.Error()) key := newRejectCacheKey( - ann.ShortChannelID.ToUint64(), + scid.ToUint64(), sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) @@ -2423,13 +2438,12 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, // If this is a remote ChannelAnnouncement with an alias SCID, we'll // reject the announcement. Since the router accepts alias SCIDs, // not erroring out would be a DoS vector. - if nMsg.isRemote && d.cfg.IsAlias(ann.ShortChannelID) { - err := fmt.Errorf("ignoring remote alias channel=%v", - ann.ShortChannelID) + if nMsg.isRemote && d.cfg.IsAlias(scid) { + err := fmt.Errorf("ignoring remote alias channel=%v", scid) log.Errorf(err.Error()) key := newRejectCacheKey( - ann.ShortChannelID.ToUint64(), + scid.ToUint64(), sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) @@ -2441,11 +2455,10 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, // If the advertised inclusionary block is beyond our knowledge of the // chain tip, then we'll ignore it for now. d.Lock() - if nMsg.isRemote && d.isPremature(ann.ShortChannelID, 0, nMsg) { + if nMsg.isRemote && d.isPremature(scid, 0, nMsg) { log.Warnf("Announcement for chan_id=(%v), is premature: "+ "advertises height %v, only height %v is known", - ann.ShortChannelID.ToUint64(), - ann.ShortChannelID.BlockHeight, d.bestHeight) + scid.ToUint64(), scid.BlockHeight, d.bestHeight) d.Unlock() nMsg.err <- nil return nil, false @@ -2454,11 +2467,56 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, // At this point, we'll now ask the router if this is a zombie/known // edge. If so we can skip all the processing below. - if d.cfg.Graph.IsKnownEdge(ann.ShortChannelID) { + if d.cfg.Graph.IsKnownEdge(scid) { nMsg.err <- nil return nil, true } + // Check if the channel is already closed in which case we can ignore + // it. + closed, err := d.cfg.ScidCloser.IsClosedScid(scid) + if err != nil { + log.Errorf("failed to check if scid %v is closed: %v", scid, + err) + nMsg.err <- err + + return nil, false + } + + if closed { + err = fmt.Errorf("ignoring closed channel %v", scid) + log.Error(err) + + // If this is an announcement from us, we'll just ignore it. + if !nMsg.isRemote { + nMsg.err <- err + return nil, false + } + + // Increment the peer's ban score if they are sending closed + // channel announcements. + d.banman.incrementBanScore(nMsg.peer.PubKey()) + + // If the peer is banned and not a channel peer, we'll + // disconnect them. + shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey()) + if dcErr != nil { + log.Errorf("failed to check if we should disconnect "+ + "peer: %v", dcErr) + nMsg.err <- dcErr + + return nil, false + } + + if shouldDc { + nMsg.peer.Disconnect(ErrPeerBanned) + } + + nMsg.err <- err + + return nil, false + } + // If this is a remote channel announcement, then we'll validate all // the signatures within the proof as it should be well formed. var proof *models.ChannelAuthProof @@ -2468,7 +2526,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, "%v", err) key := newRejectCacheKey( - ann.ShortChannelID.ToUint64(), + scid.ToUint64(), sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) @@ -2499,7 +2557,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, } edge := &models.ChannelEdgeInfo{ - ChannelID: ann.ShortChannelID.ToUint64(), + ChannelID: scid.ToUint64(), ChainHash: ann.ChainHash, NodeKey1Bytes: ann.NodeID1, NodeKey2Bytes: ann.NodeID2, @@ -2522,8 +2580,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, } } - log.Debugf("Adding edge for short_chan_id: %v", - ann.ShortChannelID.ToUint64()) + log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64()) // We will add the edge to the channel router. If the nodes present in // this channel are not present in the database, a partial node will be @@ -2533,24 +2590,25 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, // channel ID. We do this to ensure no other goroutine has read the // database and is now making decisions based on this DB state, before // it writes to the DB. - d.channelMtx.Lock(ann.ShortChannelID.ToUint64()) - err := d.cfg.Graph.AddEdge(edge, ops...) + d.channelMtx.Lock(scid.ToUint64()) + err = d.cfg.Graph.AddEdge(edge, ops...) if err != nil { log.Debugf("Graph rejected edge for short_chan_id(%v): %v", - ann.ShortChannelID.ToUint64(), err) + scid.ToUint64(), err) - defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64()) + defer d.channelMtx.Unlock(scid.ToUint64()) // If the edge was rejected due to already being known, then it // may be the case that this new message has a fresh channel // proof, so we'll check. - if graph.IsError(err, graph.ErrIgnored) { + switch { + case graph.IsError(err, graph.ErrIgnored): // Attempt to process the rejected message to see if we // get any new announcements. anns, rErr := d.processRejectedEdge(ann, proof) if rErr != nil { key := newRejectCacheKey( - ann.ShortChannelID.ToUint64(), + scid.ToUint64(), sourceToPub(nMsg.source), ) cr := &cachedReject{} @@ -2572,31 +2630,99 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, nMsg.err <- nil return anns, true - } else { + + case graph.IsError( + err, graph.ErrNoFundingTransaction, + graph.ErrInvalidFundingOutput, + ): + key := newRejectCacheKey( + scid.ToUint64(), + sourceToPub(nMsg.source), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) + + // Increment the peer's ban score. We check isRemote + // so we don't actually ban the peer in case of a local + // bug. + if nMsg.isRemote { + d.banman.incrementBanScore(nMsg.peer.PubKey()) + } + + case graph.IsError(err, graph.ErrChannelSpent): + key := newRejectCacheKey( + scid.ToUint64(), + sourceToPub(nMsg.source), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) + + // Since this channel has already been closed, we'll + // add it to the graph's closed channel index such that + // we won't attempt to do expensive validation checks + // on it again. + // TODO: Populate the ScidCloser by using closed + // channel notifications. + dbErr := d.cfg.ScidCloser.PutClosedScid(scid) + if dbErr != nil { + log.Errorf("failed to mark scid(%v) as "+ + "closed: %v", scid, dbErr) + + nMsg.err <- dbErr + + return nil, false + } + + // Increment the peer's ban score. We check isRemote + // so we don't accidentally ban ourselves in case of a + // bug. + if nMsg.isRemote { + d.banman.incrementBanScore(nMsg.peer.PubKey()) + } + + default: // Otherwise, this is just a regular rejected edge. key := newRejectCacheKey( - ann.ShortChannelID.ToUint64(), + scid.ToUint64(), sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) } + if !nMsg.isRemote { + log.Errorf("failed to add edge for local channel: %v", + err) + nMsg.err <- err + + return nil, false + } + + shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey()) + if dcErr != nil { + log.Errorf("failed to check if we should disconnect "+ + "peer: %v", dcErr) + nMsg.err <- dcErr + + return nil, false + } + + if shouldDc { + nMsg.peer.Disconnect(ErrPeerBanned) + } + nMsg.err <- err + return nil, false } // If err is nil, release the lock immediately. - d.channelMtx.Unlock(ann.ShortChannelID.ToUint64()) + d.channelMtx.Unlock(scid.ToUint64()) - log.Debugf("Finish adding edge for short_chan_id: %v", - ann.ShortChannelID.ToUint64()) + log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64()) // If we earlier received any ChannelUpdates for this channel, we can // now process them, as the channel is added to the graph. - shortChanID := ann.ShortChannelID.ToUint64() var channelUpdates []*processedNetworkMsg - earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID) + earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64()) if err == nil { // There was actually an entry in the map, so we'll accumulate // it. We don't worry about deletion, since it'll eventually @@ -2629,8 +2755,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, // shuts down. case *lnwire.ChannelUpdate: log.Debugf("Reprocessing ChannelUpdate for "+ - "shortChanID=%v", - msg.ShortChannelID.ToUint64()) + "shortChanID=%v", scid.ToUint64()) select { case d.networkMsgs <- updMsg: @@ -2664,7 +2789,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, nMsg.err <- nil log.Debugf("Processed ChannelAnnouncement: peer=%v, short_chan_id=%v", - nMsg.peer, ann.ShortChannelID.ToUint64()) + nMsg.peer, scid.ToUint64()) return announcements, true } @@ -2745,6 +2870,22 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, return nil, true } + // Check that the ChanUpdate is not too far into the future, this could + // reveal some faulty implementation therefore we log an error. + if time.Until(timestamp) > graph.DefaultChannelPruneExpiry { + log.Errorf("Skewed timestamp (%v) for edge policy of "+ + "short_chan_id(%v), timestamp too far in the future: "+ + "peer=%v, msg=%s, is_remote=%v", timestamp.Unix(), + shortChanID, nMsg.peer, nMsg.msg.MsgType(), + nMsg.isRemote, + ) + + nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+ + "timestamp too far in the future: %v", timestamp.Unix()) + + return nil, false + } + // Get the node pub key as far since we don't have it in the channel // update announcement message. We'll need this to properly verify the // message's signature. @@ -3373,3 +3514,36 @@ func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg, nMsg.err <- nil return announcements, true } + +// isBanned returns true if the peer identified by pubkey is banned for sending +// invalid channel announcements. +func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool { + return d.banman.isBanned(pubkey) +} + +// ShouldDisconnect returns true if we should disconnect the peer identified by +// pubkey. +func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) ( + bool, error) { + + pubkeySer := pubkey.SerializeCompressed() + + var pubkeyBytes [33]byte + copy(pubkeyBytes[:], pubkeySer) + + // If the public key is banned, check whether or not this is a channel + // peer. + if d.isBanned(pubkeyBytes) { + isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey) + if err != nil { + return false, err + } + + // We should only disconnect non-channel peers. + if !isChanPeer { + return true, nil + } + } + + return false, nil +} diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 7cfc7bce8f..c7cb149cfe 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -25,6 +25,7 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb/models" + "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/graph" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/kvdb" @@ -90,12 +91,13 @@ func makeTestDB(t *testing.T) (*channeldb.DB, error) { type mockGraphSource struct { bestHeight uint32 - mu sync.Mutex - nodes []channeldb.LightningNode - infos map[uint64]models.ChannelEdgeInfo - edges map[uint64][]models.ChannelEdgePolicy - zombies map[uint64][][33]byte - chansToReject map[uint64]struct{} + mu sync.Mutex + nodes []channeldb.LightningNode + infos map[uint64]models.ChannelEdgeInfo + edges map[uint64][]models.ChannelEdgePolicy + zombies map[uint64][][33]byte + chansToReject map[uint64]struct{} + addEdgeErrCode fn.Option[graph.ErrorCode] } func newMockRouter(height uint32) *mockGraphSource { @@ -126,6 +128,12 @@ func (r *mockGraphSource) AddEdge(info *models.ChannelEdgeInfo, r.mu.Lock() defer r.mu.Unlock() + if r.addEdgeErrCode.IsSome() { + return graph.NewErrf( + r.addEdgeErrCode.UnsafeFromSome(), "received error", + ) + } + if _, ok := r.infos[info.ChannelID]; ok { return errors.New("info already exist") } @@ -138,6 +146,14 @@ func (r *mockGraphSource) AddEdge(info *models.ChannelEdgeInfo, return nil } +func (r *mockGraphSource) resetAddEdgeErrCode() { + r.addEdgeErrCode = fn.None[graph.ErrorCode]() +} + +func (r *mockGraphSource) setAddEdgeErrCode(code graph.ErrorCode) { + r.addEdgeErrCode = fn.Some[graph.ErrorCode](code) +} + func (r *mockGraphSource) queueValidationFail(chanID uint64) { r.mu.Lock() defer r.mu.Unlock() @@ -707,7 +723,9 @@ type testCtx struct { broadcastedMessage chan msgWithSenders } -func createTestCtx(t *testing.T, startHeight uint32) (*testCtx, error) { +func createTestCtx(t *testing.T, startHeight uint32, isChanPeer bool) ( + *testCtx, error) { + // Next we'll initialize an instance of the channel router with mock // versions of the chain and channel notifier. As we don't need to test // any p2p functionality, the peer send and switch send, @@ -765,7 +783,7 @@ func createTestCtx(t *testing.T, startHeight uint32) (*testCtx, error) { peerChan chan<- lnpeer.Peer) { pk, _ := btcec.ParsePubKey(target[:]) - peerChan <- &mockPeer{pk, nil, nil} + peerChan <- &mockPeer{pk, nil, nil, atomic.Bool{}} }, NotifyWhenOffline: func(_ [33]byte) <-chan struct{} { c := make(chan struct{}) @@ -803,6 +821,7 @@ func createTestCtx(t *testing.T, startHeight uint32) (*testCtx, error) { FindBaseByAlias: findBaseByAlias, GetAlias: getAlias, FindChannel: mockFindChannel, + ScidCloser: newMockScidCloser(isChanPeer), }, selfKeyDesc) if err := gossiper.Start(); err != nil { @@ -831,7 +850,7 @@ func TestProcessAnnouncement(t *testing.T) { t.Parallel() timestamp := testTimestamp - ctx, err := createTestCtx(t, 0) + ctx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") assertSenderExistence := func(sender *btcec.PublicKey, msg msgWithSenders) { @@ -843,7 +862,7 @@ func TestProcessAnnouncement(t *testing.T) { } } - nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}} // First, we'll craft a valid remote channel announcement and send it to // the gossiper so that it can be processed. @@ -947,13 +966,13 @@ func TestPrematureAnnouncement(t *testing.T) { timestamp := testTimestamp - ctx, err := createTestCtx(t, 0) + ctx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") _, err = createNodeAnnouncement(remoteKeyPriv1, timestamp) require.NoError(t, err, "can't create node announcement") - nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}} // Pretending that we receive the valid channel announcement from // remote side, but block height of this announcement is greater than @@ -978,7 +997,7 @@ func TestPrematureAnnouncement(t *testing.T) { func TestSignatureAnnouncementLocalFirst(t *testing.T) { t.Parallel() - ctx, err := createTestCtx(t, proofMatureDelta) + ctx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") // Set up a channel that we can use to inspect the messages sent @@ -990,7 +1009,9 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { pk, _ := btcec.ParsePubKey(target[:]) select { - case peerChan <- &mockPeer{pk, sentMsgs, ctx.gossiper.quit}: + case peerChan <- &mockPeer{ + pk, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + }: case <-ctx.gossiper.quit: } } @@ -1000,7 +1021,9 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") - remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + } // Recreate lightning network topology. Initialize router with channel // between two nodes. @@ -1150,7 +1173,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { func TestOrphanSignatureAnnouncement(t *testing.T) { t.Parallel() - ctx, err := createTestCtx(t, proofMatureDelta) + ctx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") // Set up a channel that we can use to inspect the messages sent @@ -1162,7 +1185,9 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { pk, _ := btcec.ParsePubKey(target[:]) select { - case peerChan <- &mockPeer{pk, sentMsgs, ctx.gossiper.quit}: + case peerChan <- &mockPeer{ + pk, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + }: case <-ctx.gossiper.quit: } } @@ -1172,7 +1197,9 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") - remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + } // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process, in @@ -1333,7 +1360,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Parallel() - ctx, err := createTestCtx(t, proofMatureDelta) + ctx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") batch, err := createLocalAnnouncements(0) @@ -1344,7 +1371,9 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // Set up a channel to intercept the messages sent to the remote peer. sentToPeer := make(chan lnwire.Message, 1) - remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentToPeer, ctx.gossiper.quit, atomic.Bool{}, + } // Since the reliable send to the remote peer of the local channel proof // requires a notification when the peer comes online, we'll capture the @@ -1566,7 +1595,7 @@ out: func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { t.Parallel() - ctx, err := createTestCtx(t, proofMatureDelta) + ctx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") batch, err := createLocalAnnouncements(0) @@ -1578,7 +1607,9 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { // Set up a channel we can use to inspect messages sent by the // gossiper to the remote peer. sentToPeer := make(chan lnwire.Message, 1) - remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentToPeer, ctx.gossiper.quit, atomic.Bool{}, + } // Override NotifyWhenOnline to return the remote peer which we expect // meesages to be sent to. @@ -1772,7 +1803,7 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { ca, err := createRemoteChannelAnnouncement(0) require.NoError(t, err, "can't create remote channel announcement") - nodePeer := &mockPeer{bitcoinKeyPub2, nil, nil} + nodePeer := &mockPeer{bitcoinKeyPub2, nil, nil, atomic.Bool{}} announcements.AddMsgs(networkMsg{ msg: ca, peer: nodePeer, @@ -2004,7 +2035,7 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { timestamp = 123456 ) - ctx, err := createTestCtx(t, startingHeight) + ctx, err := createTestCtx(t, startingHeight, false) require.NoError(t, err, "can't create context") // We'll start off by processing a channel announcement without a proof @@ -2058,7 +2089,7 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { // process it. remoteChanAnn, err := createRemoteChannelAnnouncement(startingHeight - 1) require.NoError(t, err, "unable to create remote channel announcement") - peer := &mockPeer{pubKey, nil, nil} + peer := &mockPeer{pubKey, nil, nil, atomic.Bool{}} select { case err := <-ctx.gossiper.ProcessRemoteAnnouncement(remoteChanAnn, peer): @@ -2103,7 +2134,7 @@ func TestRejectZombieEdge(t *testing.T) { // We'll start by creating our test context with a batch of // announcements. - ctx, err := createTestCtx(t, 0) + ctx, err := createTestCtx(t, 0, false) require.NoError(t, err, "unable to create test context") batch, err := createRemoteAnnouncements(0) @@ -2204,7 +2235,7 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { // We'll start by creating our test context with a batch of // announcements. - ctx, err := createTestCtx(t, 0) + ctx, err := createTestCtx(t, 0, false) require.NoError(t, err, "unable to create test context") batch, err := createRemoteAnnouncements(0) @@ -2361,7 +2392,7 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { t.Parallel() - ctx, err := createTestCtx(t, proofMatureDelta) + ctx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") batch, err := createLocalAnnouncements(0) @@ -2373,7 +2404,9 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // Set up a channel that we can use to inspect the messages sent // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + } // Override NotifyWhenOnline to return the remote peer which we expect // messages to be sent to. @@ -2558,10 +2591,12 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { func TestExtraDataChannelAnnouncementValidation(t *testing.T) { t.Parallel() - ctx, err := createTestCtx(t, 0) + ctx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") - remotePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + remotePeer := &mockPeer{ + remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, + } // We'll now create an announcement that contains an extra set of bytes // that we don't know of ourselves, but should still include in the @@ -2589,10 +2624,12 @@ func TestExtraDataChannelUpdateValidation(t *testing.T) { t.Parallel() timestamp := testTimestamp - ctx, err := createTestCtx(t, 0) + ctx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") - remotePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + remotePeer := &mockPeer{ + remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, + } // In this scenario, we'll create two announcements, one regular // channel announcement, and another channel update announcement, that @@ -2640,10 +2677,12 @@ func TestExtraDataChannelUpdateValidation(t *testing.T) { func TestExtraDataNodeAnnouncementValidation(t *testing.T) { t.Parallel() - ctx, err := createTestCtx(t, 0) + ctx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") - remotePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + remotePeer := &mockPeer{ + remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, + } timestamp := testTimestamp // We'll create a node announcement that includes a set of opaque data @@ -2708,7 +2747,7 @@ func assertProcessAnnouncement(t *testing.T, result chan error) { func TestRetransmit(t *testing.T) { t.Parallel() - ctx, err := createTestCtx(t, proofMatureDelta) + ctx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") batch, err := createLocalAnnouncements(0) @@ -2716,7 +2755,7 @@ func TestRetransmit(t *testing.T) { remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") - remotePeer := &mockPeer{remoteKey, nil, nil} + remotePeer := &mockPeer{remoteKey, nil, nil, atomic.Bool{}} // Process a local channel announcement, channel update and node // announcement. No messages should be broadcasted yet, since no proof @@ -2814,7 +2853,7 @@ func TestRetransmit(t *testing.T) { func TestNodeAnnouncementNoChannels(t *testing.T) { t.Parallel() - ctx, err := createTestCtx(t, 0) + ctx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") batch, err := createRemoteAnnouncements(0) @@ -2822,7 +2861,7 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") - remotePeer := &mockPeer{remoteKey, nil, nil} + remotePeer := &mockPeer{remoteKey, nil, nil, atomic.Bool{}} // Process the remote node announcement. select { @@ -2899,14 +2938,14 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { t.Parallel() - ctx, err := createTestCtx(t, 0) + ctx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") processRemoteAnnouncement := ctx.gossiper.ProcessRemoteAnnouncement chanUpdateHeight := uint32(0) timestamp := uint32(123456) - nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}} // In this scenario, we'll test whether the message flags field in a // channel update is properly handled. @@ -2998,7 +3037,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { // We'll start by creating our test context and a batch of // announcements. - ctx, err := createTestCtx(t, proofMatureDelta) + ctx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "unable to create test context") batch, err := createLocalAnnouncements(0) @@ -3013,7 +3052,9 @@ func TestSendChannelUpdateReliably(t *testing.T) { // Set up a channel we can use to inspect messages sent by the // gossiper to the remote peer. sentToPeer := make(chan lnwire.Message, 1) - remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentToPeer, ctx.gossiper.quit, atomic.Bool{}, + } // Since we first wait to be notified of the peer before attempting to // send the message, we'll overwrite NotifyWhenOnline and @@ -3350,7 +3391,7 @@ func TestPropagateChanPolicyUpdate(t *testing.T) { // First, we'll make out test context and add 3 random channels to the // graph. startingHeight := uint32(10) - ctx, err := createTestCtx(t, startingHeight) + ctx, err := createTestCtx(t, startingHeight, false) require.NoError(t, err, "unable to create test context") const numChannels = 3 @@ -3367,7 +3408,9 @@ func TestPropagateChanPolicyUpdate(t *testing.T) { remoteKey := remoteKeyPriv1.PubKey() sentMsgs := make(chan lnwire.Message, 10) - remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + remotePeer := &mockPeer{ + remoteKey, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + } // The forced code path for sending the private ChannelUpdate to the // remote peer will be hit, forcing it to request a notification that @@ -3529,7 +3572,7 @@ func TestProcessChannelAnnouncementOptionalMsgFields(t *testing.T) { // We'll start by creating our test context and a set of test channel // announcements. - ctx, err := createTestCtx(t, 0) + ctx, err := createTestCtx(t, 0, false) require.NoError(t, err, "unable to create test context") chanAnn1 := createAnnouncementWithoutProof( @@ -3590,7 +3633,7 @@ func assertMessage(t *testing.T, expected, got lnwire.Message) { func TestSplitAnnouncementsCorrectSubBatches(t *testing.T) { // Create our test harness. const blockHeight = 100 - ctx, err := createTestCtx(t, blockHeight) + ctx, err := createTestCtx(t, blockHeight, false) require.NoError(t, err, "can't create context") const subBatchSize = 10 @@ -3702,7 +3745,7 @@ func (m *SyncManager) markGraphSyncing() { func TestBroadcastAnnsAfterGraphSynced(t *testing.T) { t.Parallel() - ctx, err := createTestCtx(t, 10) + ctx, err := createTestCtx(t, 10, false) require.NoError(t, err, "can't create context") // We'll mark the graph as not synced. This should prevent us from @@ -3715,7 +3758,9 @@ func TestBroadcastAnnsAfterGraphSynced(t *testing.T) { t.Helper() - nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + nodePeer := &mockPeer{ + remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, + } var errChan chan error if isRemote { errChan = ctx.gossiper.ProcessRemoteAnnouncement( @@ -3775,7 +3820,7 @@ func TestRateLimitChannelUpdates(t *testing.T) { // Create our test harness. const blockHeight = 100 - ctx, err := createTestCtx(t, blockHeight) + ctx, err := createTestCtx(t, blockHeight, false) require.NoError(t, err, "can't create context") ctx.gossiper.cfg.RebroadcastInterval = time.Hour ctx.gossiper.cfg.MaxChannelUpdateBurst = 5 @@ -3791,7 +3836,9 @@ func TestRateLimitChannelUpdates(t *testing.T) { batch, err := createRemoteAnnouncements(blockHeight) require.NoError(t, err) - nodePeer1 := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil} + nodePeer1 := &mockPeer{ + remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, + } select { case err := <-ctx.gossiper.ProcessRemoteAnnouncement( batch.chanAnn, nodePeer1, @@ -3810,7 +3857,9 @@ func TestRateLimitChannelUpdates(t *testing.T) { t.Fatal("remote announcement not processed") } - nodePeer2 := &mockPeer{remoteKeyPriv2.PubKey(), nil, nil} + nodePeer2 := &mockPeer{ + remoteKeyPriv2.PubKey(), nil, nil, atomic.Bool{}, + } select { case err := <-ctx.gossiper.ProcessRemoteAnnouncement( batch.chanUpdAnn2, nodePeer2, @@ -3921,7 +3970,7 @@ func TestRateLimitChannelUpdates(t *testing.T) { func TestIgnoreOwnAnnouncement(t *testing.T) { t.Parallel() - ctx, err := createTestCtx(t, proofMatureDelta) + ctx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") batch, err := createLocalAnnouncements(0) @@ -3929,7 +3978,7 @@ func TestIgnoreOwnAnnouncement(t *testing.T) { remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") - remotePeer := &mockPeer{remoteKey, nil, nil} + remotePeer := &mockPeer{remoteKey, nil, nil, atomic.Bool{}} // Try to let the remote peer tell us about the channel we are part of. select { @@ -4065,7 +4114,7 @@ func TestIgnoreOwnAnnouncement(t *testing.T) { func TestRejectCacheChannelAnn(t *testing.T) { t.Parallel() - ctx, err := createTestCtx(t, proofMatureDelta) + ctx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") // First, we create a channel announcement to send over to our test @@ -4075,7 +4124,7 @@ func TestRejectCacheChannelAnn(t *testing.T) { remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") - remotePeer := &mockPeer{remoteKey, nil, nil} + remotePeer := &mockPeer{remoteKey, nil, nil, atomic.Bool{}} // Before sending over the announcement, we'll modify it such that we // know it will always fail. @@ -4139,3 +4188,134 @@ func TestFutureMsgCacheEviction(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 2, item.height, "should be the second item") } + +// TestChanAnnBanningNonChanPeer asserts that non-channel peers who send bogus +// channel announcements are banned properly. +func TestChanAnnBanningNonChanPeer(t *testing.T) { + t.Parallel() + + ctx, err := createTestCtx(t, 1000, false) + require.NoError(t, err, "can't create context") + + nodePeer1 := &mockPeer{ + remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, + } + nodePeer2 := &mockPeer{ + remoteKeyPriv2.PubKey(), nil, nil, atomic.Bool{}, + } + + ctx.router.setAddEdgeErrCode(graph.ErrInvalidFundingOutput) + + // Loop 100 times to get nodePeer banned. + for i := 0; i < 100; i++ { + // Craft a valid channel announcement for a channel we don't + // have. We will ensure that it fails validation by modifying + // the router. + ca, err := createRemoteChannelAnnouncement(uint32(i)) + require.NoError(t, err, "can't create channel announcement") + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement( + ca, nodePeer1, + ): + require.True( + t, graph.IsError( + err, graph.ErrInvalidFundingOutput, + ), + ) + + case <-time.After(2 * time.Second): + t.Fatalf("remote announcement not processed") + } + } + + // The peer should be banned now. + require.True(t, ctx.gossiper.isBanned(nodePeer1.PubKey())) + + // Assert that nodePeer has been disconnected. + require.True(t, nodePeer1.disconnected.Load()) + + ca, err := createRemoteChannelAnnouncement(101) + require.NoError(t, err, "can't create channel announcement") + + // Set the error to ErrChannelSpent so that we can test that the + // gossiper ignores closed channels. + ctx.router.setAddEdgeErrCode(graph.ErrChannelSpent) + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer2): + require.True(t, graph.IsError(err, graph.ErrChannelSpent)) + + case <-time.After(2 * time.Second): + t.Fatalf("remote announcement not processed") + } + + // Check that the announcement's scid is marked as closed. + isClosed, err := ctx.gossiper.cfg.ScidCloser.IsClosedScid( + ca.ShortChannelID, + ) + require.Nil(t, err) + require.True(t, isClosed) + + // Remove the scid from the reject cache. + key := newRejectCacheKey( + ca.ShortChannelID.ToUint64(), + sourceToPub(nodePeer2.IdentityKey()), + ) + + ctx.gossiper.recentRejects.Delete(key) + + // Reset the AddEdge error and pass the same announcement again. An + // error should be returned even though AddEdge won't fail. + ctx.router.resetAddEdgeErrCode() + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer2): + require.NotNil(t, err) + + case <-time.After(2 * time.Second): + t.Fatalf("remote announcement not processed") + } +} + +// TestChanAnnBanningChanPeer asserts that channel peers that are banned don't +// get disconnected. +func TestChanAnnBanningChanPeer(t *testing.T) { + t.Parallel() + + ctx, err := createTestCtx(t, 1000, true) + require.NoError(t, err, "can't create context") + + nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}} + + ctx.router.setAddEdgeErrCode(graph.ErrInvalidFundingOutput) + + // Loop 100 times to get nodePeer banned. + for i := 0; i < 100; i++ { + // Craft a valid channel announcement for a channel we don't + // have. We will ensure that it fails validation by modifying + // the router. + ca, err := createRemoteChannelAnnouncement(uint32(i)) + require.NoError(t, err, "can't create channel announcement") + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement( + ca, nodePeer, + ): + require.True( + t, graph.IsError( + err, graph.ErrInvalidFundingOutput, + ), + ) + + case <-time.After(2 * time.Second): + t.Fatalf("remote announcement not processed") + } + } + + // The peer should be banned now. + require.True(t, ctx.gossiper.isBanned(nodePeer.PubKey())) + + // Assert that the peer wasn't disconnected. + require.False(t, nodePeer.disconnected.Load()) +} diff --git a/discovery/mock_test.go b/discovery/mock_test.go index 4f5c5d4e46..6bd93c29b7 100644 --- a/discovery/mock_test.go +++ b/discovery/mock_test.go @@ -4,6 +4,7 @@ import ( "errors" "net" "sync" + "sync/atomic" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/wire" @@ -14,9 +15,10 @@ import ( // mockPeer implements the lnpeer.Peer interface and is used to test the // gossiper's interaction with peers. type mockPeer struct { - pk *btcec.PublicKey - sentMsgs chan lnwire.Message - quit chan struct{} + pk *btcec.PublicKey + sentMsgs chan lnwire.Message + quit chan struct{} + disconnected atomic.Bool } var _ lnpeer.Peer = (*mockPeer)(nil) @@ -74,6 +76,10 @@ func (p *mockPeer) RemovePendingChannel(_ lnwire.ChannelID) error { return nil } +func (p *mockPeer) Disconnect(err error) { + p.disconnected.Store(true) +} + // mockMessageStore is an in-memory implementation of the MessageStore interface // used for the gossiper's unit tests. type mockMessageStore struct { @@ -155,3 +161,40 @@ func (s *mockMessageStore) MessagesForPeer(pubKey [33]byte) ([]lnwire.Message, e return msgs, nil } + +type mockScidCloser struct { + m map[lnwire.ShortChannelID]struct{} + channelPeer bool + + sync.Mutex +} + +func newMockScidCloser(channelPeer bool) *mockScidCloser { + return &mockScidCloser{ + m: make(map[lnwire.ShortChannelID]struct{}), + channelPeer: channelPeer, + } +} + +func (m *mockScidCloser) PutClosedScid(scid lnwire.ShortChannelID) error { + m.Lock() + m.m[scid] = struct{}{} + m.Unlock() + + return nil +} + +func (m *mockScidCloser) IsClosedScid(scid lnwire.ShortChannelID) (bool, + error) { + + m.Lock() + defer m.Unlock() + + _, ok := m.m[scid] + + return ok, nil +} + +func (m *mockScidCloser) IsChannelPeer(pubkey *btcec.PublicKey) (bool, error) { + return m.channelPeer, nil +} diff --git a/discovery/reliable_sender_test.go b/discovery/reliable_sender_test.go index d1e69b11fb..19fdaa1cad 100644 --- a/discovery/reliable_sender_test.go +++ b/discovery/reliable_sender_test.go @@ -2,6 +2,7 @@ package discovery import ( "fmt" + "sync/atomic" "testing" "time" @@ -74,7 +75,7 @@ func TestReliableSenderFlow(t *testing.T) { // Create a mock peer to send the messages to. pubKey := randPubKey(t) msgsSent := make(chan lnwire.Message) - peer := &mockPeer{pubKey, msgsSent, reliableSender.quit} + peer := &mockPeer{pubKey, msgsSent, reliableSender.quit, atomic.Bool{}} // Override NotifyWhenOnline and NotifyWhenOffline to provide the // notification channels so that we can control when notifications get @@ -193,7 +194,7 @@ func TestReliableSenderStaleMessages(t *testing.T) { // Create a mock peer to send the messages to. pubKey := randPubKey(t) msgsSent := make(chan lnwire.Message) - peer := &mockPeer{pubKey, msgsSent, reliableSender.quit} + peer := &mockPeer{pubKey, msgsSent, reliableSender.quit, atomic.Bool{}} // Override NotifyWhenOnline to provide the notification channel so that // we can control when notifications get dispatched. diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 39098f70e0..70d28784b8 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -22,6 +22,9 @@ const ( // force a historical sync to ensure we have as much of the public // network as possible. DefaultHistoricalSyncInterval = time.Hour + + // filterSemaSize is the capacity of gossipFilterSema. + filterSemaSize = 5 ) var ( @@ -161,12 +164,22 @@ type SyncManager struct { // duration of the connection. pinnedActiveSyncers map[route.Vertex]*GossipSyncer + // gossipFilterSema contains semaphores for the gossip timestamp + // queries. + gossipFilterSema chan struct{} + wg sync.WaitGroup quit chan struct{} } // newSyncManager constructs a new SyncManager backed by the given config. func newSyncManager(cfg *SyncManagerCfg) *SyncManager { + + filterSema := make(chan struct{}, filterSemaSize) + for i := 0; i < filterSemaSize; i++ { + filterSema <- struct{}{} + } + return &SyncManager{ cfg: *cfg, newSyncers: make(chan *newSyncer), @@ -178,7 +191,8 @@ func newSyncManager(cfg *SyncManagerCfg) *SyncManager { pinnedActiveSyncers: make( map[route.Vertex]*GossipSyncer, len(cfg.PinnedSyncers), ), - quit: make(chan struct{}), + gossipFilterSema: filterSema, + quit: make(chan struct{}), } } @@ -507,7 +521,7 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { maxQueryChanRangeReplies: maxQueryChanRangeReplies, noTimestampQueryOption: m.cfg.NoTimestampQueries, isStillZombieChannel: m.cfg.IsStillZombieChannel, - }) + }, m.gossipFilterSema) // Gossip syncers are initialized by default in a PassiveSync type // and chansSynced state so that they can reply to any peer queries or @@ -561,7 +575,7 @@ func (m *SyncManager) removeGossipSyncer(peer route.Vertex) { return } - log.Debugf("Replaced active GossipSyncer(%x) with GossipSyncer(%x)", + log.Debugf("Replaced active GossipSyncer(%v) with GossipSyncer(%x)", peer, newActiveSyncer.cfg.peerPub) } diff --git a/discovery/syncer.go b/discovery/syncer.go index b910151cb1..b6adb447a6 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/graph" "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwire" "golang.org/x/time/rate" @@ -180,9 +181,6 @@ const ( // requestBatchSize is the maximum number of channels we will query the // remote peer for in a QueryShortChanIDs message. requestBatchSize = 500 - - // filterSemaSize is the capacity of gossipFilterSema. - filterSemaSize = 5 ) var ( @@ -399,9 +397,11 @@ type GossipSyncer struct { // GossipSyncer reaches its terminal chansSynced state. syncedSignal chan struct{} - sync.Mutex + // syncerSema is used to more finely control the syncer's ability to + // respond to gossip timestamp range messages. + syncerSema chan struct{} - gossipFilterSema chan struct{} + sync.Mutex quit chan struct{} wg sync.WaitGroup @@ -409,7 +409,7 @@ type GossipSyncer struct { // newGossipSyncer returns a new instance of the GossipSyncer populated using // the passed config. -func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer { +func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer { // If no parameter was specified for max undelayed query replies, set it // to the default of 5 queries. if cfg.maxUndelayedQueryReplies <= 0 { @@ -431,11 +431,6 @@ func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer { interval, cfg.maxUndelayedQueryReplies, ) - filterSema := make(chan struct{}, filterSemaSize) - for i := 0; i < filterSemaSize; i++ { - filterSema <- struct{}{} - } - return &GossipSyncer{ cfg: cfg, rateLimiter: rateLimiter, @@ -443,7 +438,7 @@ func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer { historicalSyncReqs: make(chan *historicalSyncReq), gossipMsgs: make(chan lnwire.Message, 100), queryMsgs: make(chan lnwire.Message, 100), - gossipFilterSema: filterSema, + syncerSema: sema, quit: make(chan struct{}), } } @@ -787,6 +782,16 @@ func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange, // reply to the initial range query to discover new channels that it didn't // previously know of. func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error { + // isStale returns whether the timestamp is too far into the past. + isStale := func(timestamp time.Time) bool { + return time.Since(timestamp) > graph.DefaultChannelPruneExpiry + } + + // isSkewed returns whether the timestamp is too far into the future. + isSkewed := func(timestamp time.Time) bool { + return time.Until(timestamp) > graph.DefaultChannelPruneExpiry + } + // If we're not communicating with a legacy node, we'll apply some // further constraints on their reply to ensure it satisfies our query. if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) { @@ -838,9 +843,9 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro } for i, scid := range msg.ShortChanIDs { - info := channeldb.ChannelUpdateInfo{ - ShortChannelID: scid, - } + info := channeldb.NewChannelUpdateInfo( + scid, time.Time{}, time.Time{}, + ) if len(msg.Timestamps) != 0 { t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0) @@ -848,6 +853,32 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0) info.Node2UpdateTimestamp = t2 + + // Sort out all channels with outdated or skewed + // timestamps. Both timestamps need to be out of + // boundaries for us to skip the channel and not query + // it later on. + switch { + case isStale(info.Node1UpdateTimestamp) && + isStale(info.Node2UpdateTimestamp): + + continue + + case isSkewed(info.Node1UpdateTimestamp) && + isSkewed(info.Node2UpdateTimestamp): + + continue + + case isStale(info.Node1UpdateTimestamp) && + isSkewed(info.Node2UpdateTimestamp): + + continue + + case isStale(info.Node2UpdateTimestamp) && + isSkewed(info.Node1UpdateTimestamp): + + continue + } } g.bufferedChanRangeReplies = append( @@ -1295,12 +1326,25 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er return nil } + select { + case <-g.syncerSema: + case <-g.quit: + return ErrGossipSyncerExiting + } + + // We don't put this in a defer because if the goroutine is launched, + // it needs to be called when the goroutine is stopped. + returnSema := func() { + g.syncerSema <- struct{}{} + } + // Now that the remote peer has applied their filter, we'll query the // database for all the messages that are beyond this filter. newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon( g.cfg.chainHash, startTime, endTime, ) if err != nil { + returnSema() return err } @@ -1310,22 +1354,15 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er // If we don't have any to send, then we can return early. if len(newUpdatestoSend) == 0 { + returnSema() return nil } - select { - case <-g.gossipFilterSema: - case <-g.quit: - return ErrGossipSyncerExiting - } - // We'll conclude by launching a goroutine to send out any updates. g.wg.Add(1) go func() { defer g.wg.Done() - defer func() { - g.gossipFilterSema <- struct{}{} - }() + defer returnSema() for _, msg := range newUpdatestoSend { err := g.cfg.sendToPeerSync(msg) diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 7a649f466f..bb6aec5907 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -211,7 +211,11 @@ func newTestSyncer(hID lnwire.ShortChannelID, markGraphSynced: func() {}, maxQueryChanRangeReplies: maxQueryChanRangeReplies, } - syncer := newGossipSyncer(cfg) + + syncerSema := make(chan struct{}, 1) + syncerSema <- struct{}{} + + syncer := newGossipSyncer(cfg, syncerSema) return msgChan, syncer, cfg.channelSeries.(*mockChannelGraphTimeSeries) } @@ -1229,6 +1233,12 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { query, err := syncer.genChanRangeQuery(true) require.NoError(t, err, "unable to generate channel range query") + currentTimestamp := time.Now().Unix() + // Timestamp more than 2 weeks in the past hence expired. + expiredTimestamp := time.Unix(0, 0).Unix() + // Timestamp three weeks in the future. + skewedTimestamp := time.Now().Add(time.Hour * 24 * 18).Unix() + // When interpreting block ranges, the first reply should start from // our requested first block, and the last should end at our requested // last block. @@ -1253,14 +1263,78 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { }, { FirstBlockHeight: 12, - NumBlocks: query.NumBlocks - 12, - Complete: 1, + NumBlocks: 1, ShortChanIDs: []lnwire.ShortChannelID{ { BlockHeight: 12, }, }, }, + { + FirstBlockHeight: 13, + NumBlocks: query.NumBlocks - 13, + Complete: 1, + ShortChanIDs: []lnwire.ShortChannelID{ + { + BlockHeight: 13, + TxIndex: 1, + }, + { + BlockHeight: 13, + TxIndex: 2, + }, + { + BlockHeight: 13, + TxIndex: 3, + }, + { + BlockHeight: 13, + TxIndex: 4, + }, + { + BlockHeight: 13, + TxIndex: 5, + }, + { + BlockHeight: 13, + TxIndex: 6, + }, + }, + Timestamps: []lnwire.ChanUpdateTimestamps{ + { + // Both timestamps are valid. + Timestamp1: uint32(currentTimestamp), + Timestamp2: uint32(currentTimestamp), + }, + { + // One of the timestamps is valid. + Timestamp1: uint32(currentTimestamp), + Timestamp2: uint32(expiredTimestamp), + }, + { + // Both timestamps are expired. + Timestamp1: uint32(expiredTimestamp), + Timestamp2: uint32(expiredTimestamp), + }, + { + // Both timestamps are skewed. + Timestamp1: uint32(skewedTimestamp), + Timestamp2: uint32(skewedTimestamp), + }, + { + // One timestamp is skewed the other + // expired. + Timestamp1: uint32(expiredTimestamp), + Timestamp2: uint32(skewedTimestamp), + }, + { + // One timestamp is skewed the other + // expired. + Timestamp1: uint32(skewedTimestamp), + Timestamp2: uint32(expiredTimestamp), + }, + }, + }, } // Each reply query is the same as the original query in the legacy @@ -1274,6 +1348,9 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { replies[2].FirstBlockHeight = query.FirstBlockHeight replies[2].NumBlocks = query.NumBlocks + + replies[3].FirstBlockHeight = query.FirstBlockHeight + replies[3].NumBlocks = query.NumBlocks } // We'll begin by sending the syncer a set of non-complete channel @@ -1284,6 +1361,9 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { if err := syncer.processChanRangeReply(replies[1]); err != nil { t.Fatalf("unable to process reply: %v", err) } + if err := syncer.processChanRangeReply(replies[2]); err != nil { + t.Fatalf("unable to process reply: %v", err) + } // At this point, we should still be in our starting state as the query // hasn't finished. @@ -1301,6 +1381,14 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { { BlockHeight: 12, }, + { + BlockHeight: 13, + TxIndex: 1, + }, + { + BlockHeight: 13, + TxIndex: 2, + }, } // As we're about to send the final response, we'll launch a goroutine @@ -1335,7 +1423,7 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { // If we send the final message, then we should transition to // queryNewChannels as we've sent a non-empty set of new channels. - if err := syncer.processChanRangeReply(replies[2]); err != nil { + if err := syncer.processChanRangeReply(replies[3]); err != nil { t.Fatalf("unable to process reply: %v", err) } diff --git a/docs/release-notes/release-notes-0.18.3.md b/docs/release-notes/release-notes-0.18.3.md index f5225a2111..6389e26329 100644 --- a/docs/release-notes/release-notes-0.18.3.md +++ b/docs/release-notes/release-notes-0.18.3.md @@ -64,8 +64,26 @@ commitment when the channel was force closed. cause UpdateAddHTLC message with blinding point fields to not be re-forwarded correctly on restart. +* [A bug has been fixed that could cause invalid channel + announcements](https://github.com/lightningnetwork/lnd/pull/9002) to be + generated if the inbound fee discount is used. + +* [Fixed](https://github.com/lightningnetwork/lnd/pull/9011) a timestamp issue + in the `ReplyChannelRange` msg and introduced a check that ChanUpdates with a + timestamp too far into the future will be discarded. + +* [Fixed](https://github.com/lightningnetwork/lnd/pull/9026) a bug where we +would create a blinded route with a minHTLC greater than the actual payment +amount. Moreover remove strict correlation between min_cltv_delta and the +blinded path expiry. + # New Features ## Functional Enhancements + +* LND will now [temporarily ban peers](https://github.com/lightningnetwork/lnd/pull/9009) +that send too many invalid `ChannelAnnouncement`. This is only done for LND nodes +that validate `ChannelAnnouncement` messages. + ## RPC Additions * The [SendPaymentRequest](https://github.com/lightningnetwork/lnd/pull/8734) diff --git a/funding/manager_test.go b/funding/manager_test.go index 9db175ec39..c4c8b4f36c 100644 --- a/funding/manager_test.go +++ b/funding/manager_test.go @@ -283,6 +283,8 @@ type testNode struct { var _ lnpeer.Peer = (*testNode)(nil) +func (n *testNode) Disconnect(err error) {} + func (n *testNode) IdentityKey() *btcec.PublicKey { return n.addr.IdentityKey } diff --git a/graph/builder.go b/graph/builder.go index 06e86b24bd..82a36eb36a 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -681,7 +681,7 @@ func (b *Builder) handleNetworkUpdate(vb *ValidationBarrier, update.err <- err case IsError(err, ErrParentValidationFailed): - update.err <- newErrf(ErrIgnored, err.Error()) + update.err <- NewErrf(ErrIgnored, err.Error()) //nolint default: log.Warnf("unexpected error during validation "+ @@ -1053,7 +1053,7 @@ func (b *Builder) assertNodeAnnFreshness(node route.Vertex, "existence of node: %v", err) } if !exists { - return newErrf(ErrIgnored, "Ignoring node announcement"+ + return NewErrf(ErrIgnored, "Ignoring node announcement"+ " for node not found in channel graph (%x)", node[:]) } @@ -1063,7 +1063,7 @@ func (b *Builder) assertNodeAnnFreshness(node route.Vertex, // if not then we won't accept the new data as it would override newer // data. if !lastUpdate.Before(msgTimestamp) { - return newErrf(ErrOutdated, "Ignoring outdated "+ + return NewErrf(ErrOutdated, "Ignoring outdated "+ "announcement for %x", node[:]) } @@ -1193,11 +1193,11 @@ func (b *Builder) processUpdate(msg interface{}, "existence: %v", err) } if isZombie { - return newErrf(ErrIgnored, "ignoring msg for zombie "+ + return NewErrf(ErrIgnored, "ignoring msg for zombie "+ "chan_id=%v", msg.ChannelID) } if exists { - return newErrf(ErrIgnored, "ignoring msg for known "+ + return NewErrf(ErrIgnored, "ignoring msg for known "+ "chan_id=%v", msg.ChannelID) } @@ -1259,7 +1259,7 @@ func (b *Builder) processUpdate(msg interface{}, default: } - return newErrf(ErrNoFundingTransaction, "unable to "+ + return NewErrf(ErrNoFundingTransaction, "unable to "+ "locate funding tx: %v", err) } @@ -1294,7 +1294,7 @@ func (b *Builder) processUpdate(msg interface{}, return err } - return newErrf(ErrInvalidFundingOutput, "output "+ + return NewErrf(ErrInvalidFundingOutput, "output "+ "failed validation: %w", err) } @@ -1313,7 +1313,7 @@ func (b *Builder) processUpdate(msg interface{}, } } - return newErrf(ErrChannelSpent, "unable to fetch utxo "+ + return NewErrf(ErrChannelSpent, "unable to fetch utxo "+ "for chan_id=%v, chan_point=%v: %v", msg.ChannelID, fundingPoint, err) } @@ -1378,7 +1378,7 @@ func (b *Builder) processUpdate(msg interface{}, b.cfg.ChannelPruneExpiry if isZombie && isStaleUpdate { - return newErrf(ErrIgnored, "ignoring stale update "+ + return NewErrf(ErrIgnored, "ignoring stale update "+ "(flags=%v|%v) for zombie chan_id=%v", msg.MessageFlags, msg.ChannelFlags, msg.ChannelID) @@ -1387,7 +1387,7 @@ func (b *Builder) processUpdate(msg interface{}, // If the channel doesn't exist in our database, we cannot // apply the updated policy. if !exists { - return newErrf(ErrIgnored, "ignoring update "+ + return NewErrf(ErrIgnored, "ignoring update "+ "(flags=%v|%v) for unknown chan_id=%v", msg.MessageFlags, msg.ChannelFlags, msg.ChannelID) @@ -1405,7 +1405,7 @@ func (b *Builder) processUpdate(msg interface{}, // Ignore outdated message. if !edge1Timestamp.Before(msg.LastUpdate) { - return newErrf(ErrOutdated, "Ignoring "+ + return NewErrf(ErrOutdated, "Ignoring "+ "outdated update (flags=%v|%v) for "+ "known chan_id=%v", msg.MessageFlags, msg.ChannelFlags, msg.ChannelID) @@ -1417,7 +1417,7 @@ func (b *Builder) processUpdate(msg interface{}, // Ignore outdated message. if !edge2Timestamp.Before(msg.LastUpdate) { - return newErrf(ErrOutdated, "Ignoring "+ + return NewErrf(ErrOutdated, "Ignoring "+ "outdated update (flags=%v|%v) for "+ "known chan_id=%v", msg.MessageFlags, msg.ChannelFlags, msg.ChannelID) diff --git a/graph/builder_test.go b/graph/builder_test.go index 600bd86344..f6c5dcf9cb 100644 --- a/graph/builder_test.go +++ b/graph/builder_test.go @@ -1275,7 +1275,7 @@ func newChannelEdgeInfo(t *testing.T, ctx *testCtx, fundingHeight uint32, } func assertChanChainRejection(t *testing.T, ctx *testCtx, - edge *models.ChannelEdgeInfo, failCode errorCode) { + edge *models.ChannelEdgeInfo, failCode ErrorCode) { t.Helper() diff --git a/graph/errors.go b/graph/errors.go index c0d6b8904a..0a1d6fd244 100644 --- a/graph/errors.go +++ b/graph/errors.go @@ -2,14 +2,14 @@ package graph import "github.com/go-errors/errors" -// errorCode is used to represent the various errors that can occur within this +// ErrorCode is used to represent the various errors that can occur within this // package. -type errorCode uint8 +type ErrorCode uint8 const ( // ErrOutdated is returned when the routing update already have // been applied, or a newer update is already known. - ErrOutdated errorCode = iota + ErrOutdated ErrorCode = iota // ErrIgnored is returned when the update have been ignored because // this update can't bring us something new, or because a node @@ -39,27 +39,27 @@ const ( ErrParentValidationFailed ) -// graphError is a structure that represent the error inside the graph package, +// Error is a structure that represent the error inside the graph package, // this structure carries additional information about error code in order to // be able distinguish errors outside of the current package. -type graphError struct { +type Error struct { err *errors.Error - code errorCode + code ErrorCode } // Error represents errors as the string // NOTE: Part of the error interface. -func (e *graphError) Error() string { +func (e *Error) Error() string { return e.err.Error() } -// A compile time check to ensure graphError implements the error interface. -var _ error = (*graphError)(nil) +// A compile time check to ensure Error implements the error interface. +var _ error = (*Error)(nil) -// newErrf creates a graphError by the given error formatted description and +// NewErrf creates a Error by the given error formatted description and // its corresponding error code. -func newErrf(code errorCode, format string, a ...interface{}) *graphError { - return &graphError{ +func NewErrf(code ErrorCode, format string, a ...interface{}) *Error { + return &Error{ code: code, err: errors.Errorf(format, a...), } @@ -67,8 +67,8 @@ func newErrf(code errorCode, format string, a ...interface{}) *graphError { // IsError is a helper function which is needed to have ability to check that // returned error has specific error code. -func IsError(e interface{}, codes ...errorCode) bool { - err, ok := e.(*graphError) +func IsError(e interface{}, codes ...ErrorCode) bool { + err, ok := e.(*Error) if !ok { return false } diff --git a/graph/validation_barrier.go b/graph/validation_barrier.go index 2f3c8c02ce..731852d753 100644 --- a/graph/validation_barrier.go +++ b/graph/validation_barrier.go @@ -238,12 +238,12 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { // is closed, or the set of jobs exits. select { case <-v.quit: - return newErrf(ErrVBarrierShuttingDown, + return NewErrf(ErrVBarrierShuttingDown, "validation barrier shutting down") case <-signals.deny: log.Debugf("Signal deny for %s", jobDesc) - return newErrf(ErrParentValidationFailed, + return NewErrf(ErrParentValidationFailed, "parent validation failed") case <-signals.allow: diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 36f273112a..f50df8cd5e 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -2072,6 +2072,8 @@ func (m *mockPeer) QuitSignal() <-chan struct{} { return m.quit } +func (m *mockPeer) Disconnect(err error) {} + var _ lnpeer.Peer = (*mockPeer)(nil) func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error { diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 96417d9c05..ed05a31655 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -684,6 +684,8 @@ func (s *mockServer) RemoteFeatures() *lnwire.FeatureVector { return nil } +func (s *mockServer) Disconnect(err error) {} + func (s *mockServer) Stop() error { if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) { return nil diff --git a/lnpeer/mock_peer.go b/lnpeer/mock_peer.go index 908c9a0c9c..7fed3e4a04 100644 --- a/lnpeer/mock_peer.go +++ b/lnpeer/mock_peer.go @@ -79,3 +79,5 @@ func (m *MockPeer) RemoteFeatures() *lnwire.FeatureVector { args := m.Called() return args.Get(0).(*lnwire.FeatureVector) } + +func (m *MockPeer) Disconnect(err error) {} diff --git a/lnpeer/peer.go b/lnpeer/peer.go index f7d2e971ba..cb6bc9867a 100644 --- a/lnpeer/peer.go +++ b/lnpeer/peer.go @@ -74,4 +74,7 @@ type Peer interface { // by the remote peer. This allows sub-systems that use this interface // to gate their behavior off the set of negotiated feature bits. RemoteFeatures() *lnwire.FeatureVector + + // Disconnect halts communication with the peer. + Disconnect(error) } diff --git a/routing/blindedpath/blinded_path.go b/routing/blindedpath/blinded_path.go index 19a6edcaa6..bc14daa40a 100644 --- a/routing/blindedpath/blinded_path.go +++ b/routing/blindedpath/blinded_path.go @@ -82,7 +82,16 @@ type BuildBlindedPathCfg struct { MinFinalCLTVExpiryDelta uint32 // BlocksUntilExpiry is the number of blocks that this blinded path - // should remain valid for. + // should remain valid for. This is a relative number of blocks. This + // number in addition with a potential minimum cltv delta for the last + // hop and some block padding will be the payment constraint which is + // part of the blinded hop info. Every htlc using the provided blinded + // hops cannot have a higher cltv delta otherwise it will get rejected + // by the forwarding nodes or the final node. + // + // This number should at least be greater than the invoice expiry time + // so that the blinded route is always valid as long as the invoice is + // valid. BlocksUntilExpiry uint32 // MinNumHops is the minimum number of hops that each blinded path @@ -105,13 +114,6 @@ type BuildBlindedPathCfg struct { func BuildBlindedPaymentPaths(cfg *BuildBlindedPathCfg) ( []*zpay32.BlindedPaymentPath, error) { - if cfg.MinFinalCLTVExpiryDelta >= cfg.BlocksUntilExpiry { - return nil, fmt.Errorf("blinded path CLTV expiry delta (%d) "+ - "must be greater than the minimum final CLTV expiry "+ - "delta (%d)", cfg.BlocksUntilExpiry, - cfg.MinFinalCLTVExpiryDelta) - } - // Find some appropriate routes for the value to be routed. This will // return a set of routes made up of real nodes. routes, err := cfg.FindRoutes(cfg.ValueMsat) @@ -148,7 +150,7 @@ func BuildBlindedPaymentPaths(cfg *BuildBlindedPathCfg) ( continue } else if err != nil { log.Errorf("Not using route (%s) as a blinded path: %v", - err) + route, err) continue } @@ -435,11 +437,27 @@ func collectRelayInfo(cfg *BuildBlindedPathCfg, path *candidatePath) ( } } - policy, err = cfg.AddPolicyBuffer(policy) + if policy.MinHTLCMsat > cfg.ValueMsat { + return nil, 0, 0, fmt.Errorf("%w: minHTLC of hop "+ + "policy larger than payment amt: sentAmt(%v), "+ + "minHTLC(%v)", errInvalidBlindedPath, + cfg.ValueMsat, policy.MinHTLCMsat) + } + + bufferPolicy, err := cfg.AddPolicyBuffer(policy) if err != nil { return nil, 0, 0, err } + // We only use the new buffered policy if the new minHTLC value + // does not violate the sender amount. + // + // NOTE: We don't check this for maxHTLC, because the payment + // amount can always be splitted using MPP. + if bufferPolicy.MinHTLCMsat <= cfg.ValueMsat { + policy = bufferPolicy + } + // If this is the first policy we are collecting, then use this // policy to set the base values for min/max htlc. if len(hops) == 0 { diff --git a/server.go b/server.go index 555ee26274..33f1098484 100644 --- a/server.go +++ b/server.go @@ -1021,6 +1021,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, return nil, err } + scidCloserMan := discovery.NewScidCloserMan(s.graphDB, s.chanStateDB) + s.authGossiper = discovery.New(discovery.Config{ Graph: s.graphBuilder, Notifier: s.cc.ChainNotifier, @@ -1058,6 +1060,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, GetAlias: s.aliasMgr.GetPeerAlias, FindChannel: s.findChannel, IsStillZombieChannel: s.graphBuilder.IsZombieChannel, + ScidCloser: scidCloserMan, }, nodeKeyDesc) //nolint:lll @@ -3605,11 +3608,34 @@ func (s *server) InboundPeerConnected(conn net.Conn) { } nodePub := conn.(*brontide.Conn).RemotePub() - pubStr := string(nodePub.SerializeCompressed()) + pubSer := nodePub.SerializeCompressed() + pubStr := string(pubSer) + + var pubBytes [33]byte + copy(pubBytes[:], pubSer) s.mu.Lock() defer s.mu.Unlock() + // If the remote node's public key is banned, drop the connection. + shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub) + if dcErr != nil { + srvrLog.Errorf("Unable to check if we should disconnect "+ + "peer: %v", dcErr) + conn.Close() + + return + } + + if shouldDc { + srvrLog.Debugf("Dropping connection for %v since they are "+ + "banned.", pubSer) + + conn.Close() + + return + } + // If we already have an outbound connection to this peer, then ignore // this new connection. if p, ok := s.outboundPeers[pubStr]; ok { @@ -3692,11 +3718,38 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) } nodePub := conn.(*brontide.Conn).RemotePub() - pubStr := string(nodePub.SerializeCompressed()) + pubSer := nodePub.SerializeCompressed() + pubStr := string(pubSer) + + var pubBytes [33]byte + copy(pubBytes[:], pubSer) s.mu.Lock() defer s.mu.Unlock() + // If the remote node's public key is banned, drop the connection. + shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub) + if dcErr != nil { + srvrLog.Errorf("Unable to check if we should disconnect "+ + "peer: %v", dcErr) + conn.Close() + + return + } + + if shouldDc { + srvrLog.Debugf("Dropping connection for %v since they are "+ + "banned.", pubSer) + + if connReq != nil { + s.connMgr.Remove(connReq.ID()) + } + + conn.Close() + + return + } + // If we already have an inbound connection to this peer, then ignore // this new connection. if p, ok := s.inboundPeers[pubStr]; ok {