Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/network): implement persistent peers functionality #1512

Merged
merged 12 commits into from
Apr 8, 2021
6 changes: 6 additions & 0 deletions cmd/gossamer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ func setDotNetworkConfig(ctx *cli.Context, tomlCfg ctoml.NetworkConfig, cfg *dot
cfg.NoMDNS = tomlCfg.NoMDNS
cfg.MinPeers = tomlCfg.MinPeers
cfg.MaxPeers = tomlCfg.MaxPeers
cfg.PersistentPeers = tomlCfg.PersistentPeers

// check --port flag and update node configuration
if port := ctx.GlobalUint(PortFlag.Name); port != 0 {
Expand Down Expand Up @@ -593,6 +594,10 @@ func setDotNetworkConfig(ctx *cli.Context, tomlCfg ctoml.NetworkConfig, cfg *dot
cfg.NoMDNS = true
}

if len(cfg.PersistentPeers) == 0 {
cfg.PersistentPeers = []string(nil)
}

logger.Debug(
"network configuration",
"port", cfg.Port,
Expand All @@ -602,6 +607,7 @@ func setDotNetworkConfig(ctx *cli.Context, tomlCfg ctoml.NetworkConfig, cfg *dot
"nomdns", cfg.NoMDNS,
"minpeers", cfg.MinPeers,
"maxpeers", cfg.MaxPeers,
"persistent-peers", cfg.PersistentPeers,
)
}

Expand Down
15 changes: 8 additions & 7 deletions dot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,14 @@ type AccountConfig struct {

// NetworkConfig is to marshal/unmarshal toml network config vars
type NetworkConfig struct {
Port uint32
Bootnodes []string
ProtocolID string
NoBootstrap bool
NoMDNS bool
MinPeers int
MaxPeers int
Port uint32
Bootnodes []string
ProtocolID string
NoBootstrap bool
NoMDNS bool
MinPeers int
MaxPeers int
PersistentPeers []string
}

// CoreConfig is to marshal/unmarshal toml core config vars
Expand Down
15 changes: 8 additions & 7 deletions dot/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,14 @@ type AccountConfig struct {

// NetworkConfig is to marshal/unmarshal toml network config vars
type NetworkConfig struct {
Port uint32 `toml:"port,omitempty"`
Bootnodes []string `toml:"bootnodes,omitempty"`
ProtocolID string `toml:"protocol,omitempty"`
NoBootstrap bool `toml:"nobootstrap,omitempty"`
NoMDNS bool `toml:"nomdns,omitempty"`
MinPeers int `toml:"min-peers,omitempty"`
MaxPeers int `toml:"max-peers,omitempty"`
Port uint32 `toml:"port,omitempty"`
Bootnodes []string `toml:"bootnodes,omitempty"`
ProtocolID string `toml:"protocol,omitempty"`
NoBootstrap bool `toml:"nobootstrap,omitempty"`
NoMDNS bool `toml:"nomdns,omitempty"`
MinPeers int `toml:"min-peers,omitempty"`
MaxPeers int `toml:"max-peers,omitempty"`
PersistentPeers []string `toml:"persistent-peers,omitempty"`
}

// CoreConfig is to marshal/unmarshal toml core config vars
Expand Down
3 changes: 3 additions & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type Config struct {
MinPeers int
MaxPeers int

// PersistentPeers is a list of multiaddrs which the node should remain connected to
PersistentPeers []string

// privateKey the private key for the network p2p identity
privateKey crypto.PrivKey

Expand Down
65 changes: 39 additions & 26 deletions dot/network/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,29 @@ import (

// ConnManager implements connmgr.ConnManager
type ConnManager struct {
sync.Mutex
host *host
min, max int
disconnectHandler func(peer.ID)

// closeHandlerMap contains close handler corresponding to a protocol.
closeHandlerMap map[protocol.ID]func(peerID peer.ID)

protectedPeerMapMu sync.RWMutex
// protectedPeerMap contains a list of peers that are protected from pruning
// protectedPeers contains a list of peers that are protected from pruning
// when we reach the maximum numbers of peers.
protectedPeerMap map[peer.ID]struct{}
sync.Mutex
protectedPeers *sync.Map // map[peer.ID]struct{}

// persistentPeers contains peers we should remain connected to.
persistentPeers *sync.Map // map[peer.ID]struct{}
}

func newConnManager(min, max int) *ConnManager {
return &ConnManager{
min: min,
max: max,
closeHandlerMap: make(map[protocol.ID]func(peerID peer.ID)),
protectedPeerMap: make(map[peer.ID]struct{}),
min: min,
max: max,
closeHandlerMap: make(map[protocol.ID]func(peerID peer.ID)),
protectedPeers: new(sync.Map),
persistentPeers: new(sync.Map),
}
}

Expand Down Expand Up @@ -85,36 +89,23 @@ func (*ConnManager) TrimOpenConns(ctx context.Context) {}
// Protect peer will add the given peer to the protectedPeerMap which will
// protect the peer from pruning.
func (cm *ConnManager) Protect(id peer.ID, tag string) {
cm.protectedPeerMapMu.Lock()
defer cm.protectedPeerMapMu.Unlock()

cm.protectedPeerMap[id] = struct{}{}
cm.protectedPeers.Store(id, struct{}{})
}

// Unprotect peer will remove the given peer from prune protection.
// returns true if we have successfully removed the peer from the
// protectedPeerMap. False otherwise.
func (cm *ConnManager) Unprotect(id peer.ID, tag string) bool {
cm.protectedPeerMapMu.Lock()
defer cm.protectedPeerMapMu.Unlock()

_, ok := cm.protectedPeerMap[id]
if ok {
delete(cm.protectedPeerMap, id)
return true
}
return false
_, wasDeleted := cm.protectedPeers.LoadAndDelete(id)
return wasDeleted
}

// Close peer
func (*ConnManager) Close() error { return nil }

// IsProtected returns whether the given peer is protected from pruning or not.
func (cm *ConnManager) IsProtected(id peer.ID, tag string) (protected bool) {
cm.protectedPeerMapMu.RLock()
defer cm.protectedPeerMapMu.RUnlock()

_, ok := cm.protectedPeerMap[id]
_, ok := cm.protectedPeers.Load(id)
return ok
}

Expand All @@ -140,7 +131,7 @@ func (cm *ConnManager) ListenClose(n network.Network, addr ma.Multiaddr) {
func (cm *ConnManager) unprotectedPeers(peers []peer.ID) []peer.ID {
unprot := []peer.ID{}
for _, id := range peers {
if !cm.IsProtected(id, "") {
if !cm.IsProtected(id, "") && !cm.isPersistent(id) {
unprot = append(unprot, id)
}
}
Expand All @@ -159,6 +150,7 @@ func (cm *ConnManager) Connected(n network.Network, c network.Conn) {
cm.Lock()
defer cm.Unlock()

// TODO: this should be updated to disconnect from (total_peers - maximum) peers, instead of just one peer
if len(n.Peers()) > cm.max {
unprotPeers := cm.unprotectedPeers(n.Peers())
if len(unprotPeers) == 0 {
Expand Down Expand Up @@ -188,6 +180,22 @@ func (cm *ConnManager) Disconnected(n network.Network, c network.Conn) {
if cm.disconnectHandler != nil {
cm.disconnectHandler(c.RemotePeer())
}

if !cm.isPersistent(c.RemotePeer()) {
return
}

addrs := cm.host.h.Peerstore().Addrs(c.RemotePeer())
info := peer.AddrInfo{
ID: c.RemotePeer(),
Addrs: addrs,
}

err := cm.host.connect(info)
if err != nil {
logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err)
}

// TODO: if number of peers falls below the min desired peer count, we should try to connect to previously discovered peers
}

Expand Down Expand Up @@ -224,3 +232,8 @@ func (cm *ConnManager) ClosedStream(n network.Network, s network.Stream) {
closeCB(s.Conn().RemotePeer())
}
}

func (cm *ConnManager) isPersistent(p peer.ID) bool {
_, ok := cm.persistentPeers.Load(p)
return ok
}
33 changes: 32 additions & 1 deletion dot/network/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package network
import (
"fmt"
"testing"
"time"

"github.com/ChainSafe/gossamer/lib/utils"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -64,7 +65,6 @@ func TestMaxPeers(t *testing.T) {

func TestProtectUnprotectPeer(t *testing.T) {
cm := newConnManager(1, 4)
require.Zero(t, len(cm.protectedPeerMap))

p1 := peer.ID("a")
p2 := peer.ID("b")
Expand All @@ -86,3 +86,34 @@ func TestProtectUnprotectPeer(t *testing.T) {
unprot = cm.unprotectedPeers([]peer.ID{p1, p2, p3, p4})
require.Equal(t, unprot, []peer.ID{p1, p2, p3, p4})
}

func TestPersistentPeers(t *testing.T) {
configA := &Config{
BasePath: utils.NewTestBasePath(t, "node-a"),
Port: 7000,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
nodeA := createTestService(t, configA)

addrs := nodeA.host.multiaddrs()
configB := &Config{
BasePath: utils.NewTestBasePath(t, "node-b"),
Port: 7001,
RandSeed: 2,
NoMDNS: true,
PersistentPeers: []string{addrs[0].String()},
}
nodeB := createTestService(t, configB)

// B should have connected to A during bootstrap
conns := nodeB.host.h.Network().ConnsToPeer(nodeA.host.id())
require.NotEqual(t, 0, len(conns))
Comment on lines +110 to +112
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this, shouldn't len(conns) == 1, since nodeB should be connected to nodeA?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this just checks that it's greater than 0, there might be multiple conns open potentially


// if A disconnects from B, B should reconnect
nodeA.host.h.Network().ClosePeer(nodeA.host.id())
time.Sleep(time.Millisecond * 500)
conns = nodeB.host.h.Network().ConnsToPeer(nodeA.host.id())
require.NotEqual(t, 0, len(conns))
}
51 changes: 33 additions & 18 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ var privateCIDRs = []string{

// host wraps libp2p host with network host configuration and services
type host struct {
ctx context.Context
h libp2phost.Host
dht *dual.DHT
bootnodes []peer.AddrInfo
protocolID protocol.ID
cm *ConnManager
ds *badger.Datastore
ctx context.Context
h libp2phost.Host
dht *dual.DHT
bootnodes []peer.AddrInfo
persistentPeers []peer.AddrInfo
protocolID protocol.ID
cm *ConnManager
ds *badger.Datastore
}

// newHost creates a host wrapper with a new libp2p host instance
Expand All @@ -78,6 +79,16 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
return nil, err
}

// format persistent peers
pps, err := stringsToAddrInfos(cfg.PersistentPeers)
if err != nil {
return nil, err
}

for _, pp := range pps {
cm.persistentPeers.Store(pp.ID, struct{}{})
}

// format protocol id
pid := protocol.ID(cfg.ProtocolID)

Expand Down Expand Up @@ -143,16 +154,19 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
// wrap host and DHT service with routed host
h = rhost.Wrap(h, dht)

return &host{
ctx: ctx,
h: h,
dht: dht,
bootnodes: bns,
protocolID: pid,
cm: cm,
ds: ds,
}, nil
host := &host{
ctx: ctx,
h: h,
dht: dht,
bootnodes: bns,
protocolID: pid,
cm: cm,
ds: ds,
persistentPeers: pps,
}

cm.host = host
return host, nil
}

// close closes host services and the libp2p host (host services first)
Expand Down Expand Up @@ -221,14 +235,15 @@ func (h *host) addToPeerstore(p peer.AddrInfo) {
// bootstrap connects the host to the configured bootnodes
func (h *host) bootstrap() {
failed := 0
for _, addrInfo := range h.bootnodes {
all := append(h.bootnodes, h.persistentPeers...)
for _, addrInfo := range all {
err := h.connect(addrInfo)
if err != nil {
logger.Debug("failed to bootstrap to peer", "error", err)
failed++
}
}
if failed == len(h.bootnodes) {
if failed == len(all) {
logger.Error("failed to bootstrap to any bootnode")
}
}
Expand Down
25 changes: 13 additions & 12 deletions dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,18 +263,19 @@ func createNetworkService(cfg *Config, stateSrvc *state.Service) (*network.Servi

// network service configuation
networkConfig := network.Config{
LogLvl: cfg.Log.NetworkLvl,
BlockState: stateSrvc.Block,
BasePath: cfg.Global.BasePath,
Roles: cfg.Core.Roles,
Port: cfg.Network.Port,
Bootnodes: cfg.Network.Bootnodes,
ProtocolID: cfg.Network.ProtocolID,
NoBootstrap: cfg.Network.NoBootstrap,
NoMDNS: cfg.Network.NoMDNS,
MinPeers: cfg.Network.MinPeers,
MaxPeers: cfg.Network.MaxPeers,
PublishMetrics: cfg.Global.PublishMetrics,
LogLvl: cfg.Log.NetworkLvl,
BlockState: stateSrvc.Block,
BasePath: cfg.Global.BasePath,
Roles: cfg.Core.Roles,
Port: cfg.Network.Port,
Bootnodes: cfg.Network.Bootnodes,
ProtocolID: cfg.Network.ProtocolID,
NoBootstrap: cfg.Network.NoBootstrap,
NoMDNS: cfg.Network.NoMDNS,
MinPeers: cfg.Network.MinPeers,
MaxPeers: cfg.Network.MaxPeers,
PublishMetrics: cfg.Global.PublishMetrics,
PersistentPeers: cfg.Network.PersistentPeers,
}

networkSrvc, err := network.NewService(&networkConfig)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 // indirect
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect
golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd
golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd // indirect
google.golang.org/appengine v1.6.5 // indirect
google.golang.org/protobuf v1.25.0
)
Expand Down