Skip to content

Commit

Permalink
feat(dot/network): implement persistent peers functionality (#1512)
Browse files Browse the repository at this point in the history
  • Loading branch information
noot committed Apr 8, 2021
1 parent 98d1413 commit 7850532
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 72 deletions.
6 changes: 6 additions & 0 deletions cmd/gossamer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ func setDotNetworkConfig(ctx *cli.Context, tomlCfg ctoml.NetworkConfig, cfg *dot
cfg.NoMDNS = tomlCfg.NoMDNS
cfg.MinPeers = tomlCfg.MinPeers
cfg.MaxPeers = tomlCfg.MaxPeers
cfg.PersistentPeers = tomlCfg.PersistentPeers

// check --port flag and update node configuration
if port := ctx.GlobalUint(PortFlag.Name); port != 0 {
Expand Down Expand Up @@ -593,6 +594,10 @@ func setDotNetworkConfig(ctx *cli.Context, tomlCfg ctoml.NetworkConfig, cfg *dot
cfg.NoMDNS = true
}

if len(cfg.PersistentPeers) == 0 {
cfg.PersistentPeers = []string(nil)
}

logger.Debug(
"network configuration",
"port", cfg.Port,
Expand All @@ -602,6 +607,7 @@ func setDotNetworkConfig(ctx *cli.Context, tomlCfg ctoml.NetworkConfig, cfg *dot
"nomdns", cfg.NoMDNS,
"minpeers", cfg.MinPeers,
"maxpeers", cfg.MaxPeers,
"persistent-peers", cfg.PersistentPeers,
)
}

Expand Down
15 changes: 8 additions & 7 deletions dot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,14 @@ type AccountConfig struct {

// NetworkConfig is to marshal/unmarshal toml network config vars
type NetworkConfig struct {
Port uint32
Bootnodes []string
ProtocolID string
NoBootstrap bool
NoMDNS bool
MinPeers int
MaxPeers int
Port uint32
Bootnodes []string
ProtocolID string
NoBootstrap bool
NoMDNS bool
MinPeers int
MaxPeers int
PersistentPeers []string
}

// CoreConfig is to marshal/unmarshal toml core config vars
Expand Down
15 changes: 8 additions & 7 deletions dot/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,14 @@ type AccountConfig struct {

// NetworkConfig is to marshal/unmarshal toml network config vars
type NetworkConfig struct {
Port uint32 `toml:"port,omitempty"`
Bootnodes []string `toml:"bootnodes,omitempty"`
ProtocolID string `toml:"protocol,omitempty"`
NoBootstrap bool `toml:"nobootstrap,omitempty"`
NoMDNS bool `toml:"nomdns,omitempty"`
MinPeers int `toml:"min-peers,omitempty"`
MaxPeers int `toml:"max-peers,omitempty"`
Port uint32 `toml:"port,omitempty"`
Bootnodes []string `toml:"bootnodes,omitempty"`
ProtocolID string `toml:"protocol,omitempty"`
NoBootstrap bool `toml:"nobootstrap,omitempty"`
NoMDNS bool `toml:"nomdns,omitempty"`
MinPeers int `toml:"min-peers,omitempty"`
MaxPeers int `toml:"max-peers,omitempty"`
PersistentPeers []string `toml:"persistent-peers,omitempty"`
}

// CoreConfig is to marshal/unmarshal toml core config vars
Expand Down
3 changes: 3 additions & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type Config struct {
MinPeers int
MaxPeers int

// PersistentPeers is a list of multiaddrs which the node should remain connected to
PersistentPeers []string

// privateKey the private key for the network p2p identity
privateKey crypto.PrivKey

Expand Down
65 changes: 39 additions & 26 deletions dot/network/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,29 @@ import (

// ConnManager implements connmgr.ConnManager
type ConnManager struct {
sync.Mutex
host *host
min, max int
disconnectHandler func(peer.ID)

// closeHandlerMap contains close handler corresponding to a protocol.
closeHandlerMap map[protocol.ID]func(peerID peer.ID)

protectedPeerMapMu sync.RWMutex
// protectedPeerMap contains a list of peers that are protected from pruning
// protectedPeers contains a list of peers that are protected from pruning
// when we reach the maximum numbers of peers.
protectedPeerMap map[peer.ID]struct{}
sync.Mutex
protectedPeers *sync.Map // map[peer.ID]struct{}

// persistentPeers contains peers we should remain connected to.
persistentPeers *sync.Map // map[peer.ID]struct{}
}

func newConnManager(min, max int) *ConnManager {
return &ConnManager{
min: min,
max: max,
closeHandlerMap: make(map[protocol.ID]func(peerID peer.ID)),
protectedPeerMap: make(map[peer.ID]struct{}),
min: min,
max: max,
closeHandlerMap: make(map[protocol.ID]func(peerID peer.ID)),
protectedPeers: new(sync.Map),
persistentPeers: new(sync.Map),
}
}

Expand Down Expand Up @@ -85,36 +89,23 @@ func (*ConnManager) TrimOpenConns(ctx context.Context) {}
// Protect peer will add the given peer to the protectedPeerMap which will
// protect the peer from pruning.
func (cm *ConnManager) Protect(id peer.ID, tag string) {
cm.protectedPeerMapMu.Lock()
defer cm.protectedPeerMapMu.Unlock()

cm.protectedPeerMap[id] = struct{}{}
cm.protectedPeers.Store(id, struct{}{})
}

// Unprotect peer will remove the given peer from prune protection.
// returns true if we have successfully removed the peer from the
// protectedPeerMap. False otherwise.
func (cm *ConnManager) Unprotect(id peer.ID, tag string) bool {
cm.protectedPeerMapMu.Lock()
defer cm.protectedPeerMapMu.Unlock()

_, ok := cm.protectedPeerMap[id]
if ok {
delete(cm.protectedPeerMap, id)
return true
}
return false
_, wasDeleted := cm.protectedPeers.LoadAndDelete(id)
return wasDeleted
}

// Close peer
func (*ConnManager) Close() error { return nil }

// IsProtected returns whether the given peer is protected from pruning or not.
func (cm *ConnManager) IsProtected(id peer.ID, tag string) (protected bool) {
cm.protectedPeerMapMu.RLock()
defer cm.protectedPeerMapMu.RUnlock()

_, ok := cm.protectedPeerMap[id]
_, ok := cm.protectedPeers.Load(id)
return ok
}

Expand All @@ -140,7 +131,7 @@ func (cm *ConnManager) ListenClose(n network.Network, addr ma.Multiaddr) {
func (cm *ConnManager) unprotectedPeers(peers []peer.ID) []peer.ID {
unprot := []peer.ID{}
for _, id := range peers {
if !cm.IsProtected(id, "") {
if !cm.IsProtected(id, "") && !cm.isPersistent(id) {
unprot = append(unprot, id)
}
}
Expand All @@ -159,6 +150,7 @@ func (cm *ConnManager) Connected(n network.Network, c network.Conn) {
cm.Lock()
defer cm.Unlock()

// TODO: this should be updated to disconnect from (total_peers - maximum) peers, instead of just one peer
if len(n.Peers()) > cm.max {
unprotPeers := cm.unprotectedPeers(n.Peers())
if len(unprotPeers) == 0 {
Expand Down Expand Up @@ -188,6 +180,22 @@ func (cm *ConnManager) Disconnected(n network.Network, c network.Conn) {
if cm.disconnectHandler != nil {
cm.disconnectHandler(c.RemotePeer())
}

if !cm.isPersistent(c.RemotePeer()) {
return
}

addrs := cm.host.h.Peerstore().Addrs(c.RemotePeer())
info := peer.AddrInfo{
ID: c.RemotePeer(),
Addrs: addrs,
}

err := cm.host.connect(info)
if err != nil {
logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err)
}

// TODO: if number of peers falls below the min desired peer count, we should try to connect to previously discovered peers
}

Expand Down Expand Up @@ -224,3 +232,8 @@ func (cm *ConnManager) ClosedStream(n network.Network, s network.Stream) {
closeCB(s.Conn().RemotePeer())
}
}

func (cm *ConnManager) isPersistent(p peer.ID) bool {
_, ok := cm.persistentPeers.Load(p)
return ok
}
33 changes: 32 additions & 1 deletion dot/network/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package network
import (
"fmt"
"testing"
"time"

"github.com/ChainSafe/gossamer/lib/utils"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -64,7 +65,6 @@ func TestMaxPeers(t *testing.T) {

func TestProtectUnprotectPeer(t *testing.T) {
cm := newConnManager(1, 4)
require.Zero(t, len(cm.protectedPeerMap))

p1 := peer.ID("a")
p2 := peer.ID("b")
Expand All @@ -86,3 +86,34 @@ func TestProtectUnprotectPeer(t *testing.T) {
unprot = cm.unprotectedPeers([]peer.ID{p1, p2, p3, p4})
require.Equal(t, unprot, []peer.ID{p1, p2, p3, p4})
}

func TestPersistentPeers(t *testing.T) {
configA := &Config{
BasePath: utils.NewTestBasePath(t, "node-a"),
Port: 7000,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
nodeA := createTestService(t, configA)

addrs := nodeA.host.multiaddrs()
configB := &Config{
BasePath: utils.NewTestBasePath(t, "node-b"),
Port: 7001,
RandSeed: 2,
NoMDNS: true,
PersistentPeers: []string{addrs[0].String()},
}
nodeB := createTestService(t, configB)

// B should have connected to A during bootstrap
conns := nodeB.host.h.Network().ConnsToPeer(nodeA.host.id())
require.NotEqual(t, 0, len(conns))

// if A disconnects from B, B should reconnect
nodeA.host.h.Network().ClosePeer(nodeA.host.id())
time.Sleep(time.Millisecond * 500)
conns = nodeB.host.h.Network().ConnsToPeer(nodeA.host.id())
require.NotEqual(t, 0, len(conns))
}
51 changes: 33 additions & 18 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ var privateCIDRs = []string{

// host wraps libp2p host with network host configuration and services
type host struct {
ctx context.Context
h libp2phost.Host
dht *dual.DHT
bootnodes []peer.AddrInfo
protocolID protocol.ID
cm *ConnManager
ds *badger.Datastore
ctx context.Context
h libp2phost.Host
dht *dual.DHT
bootnodes []peer.AddrInfo
persistentPeers []peer.AddrInfo
protocolID protocol.ID
cm *ConnManager
ds *badger.Datastore
}

// newHost creates a host wrapper with a new libp2p host instance
Expand All @@ -78,6 +79,16 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
return nil, err
}

// format persistent peers
pps, err := stringsToAddrInfos(cfg.PersistentPeers)
if err != nil {
return nil, err
}

for _, pp := range pps {
cm.persistentPeers.Store(pp.ID, struct{}{})
}

// format protocol id
pid := protocol.ID(cfg.ProtocolID)

Expand Down Expand Up @@ -143,16 +154,19 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
// wrap host and DHT service with routed host
h = rhost.Wrap(h, dht)

return &host{
ctx: ctx,
h: h,
dht: dht,
bootnodes: bns,
protocolID: pid,
cm: cm,
ds: ds,
}, nil
host := &host{
ctx: ctx,
h: h,
dht: dht,
bootnodes: bns,
protocolID: pid,
cm: cm,
ds: ds,
persistentPeers: pps,
}

cm.host = host
return host, nil
}

// close closes host services and the libp2p host (host services first)
Expand Down Expand Up @@ -221,14 +235,15 @@ func (h *host) addToPeerstore(p peer.AddrInfo) {
// bootstrap connects the host to the configured bootnodes
func (h *host) bootstrap() {
failed := 0
for _, addrInfo := range h.bootnodes {
all := append(h.bootnodes, h.persistentPeers...)
for _, addrInfo := range all {
err := h.connect(addrInfo)
if err != nil {
logger.Debug("failed to bootstrap to peer", "error", err)
failed++
}
}
if failed == len(h.bootnodes) {
if failed == len(all) {
logger.Error("failed to bootstrap to any bootnode")
}
}
Expand Down
25 changes: 13 additions & 12 deletions dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,18 +263,19 @@ func createNetworkService(cfg *Config, stateSrvc *state.Service) (*network.Servi

// network service configuation
networkConfig := network.Config{
LogLvl: cfg.Log.NetworkLvl,
BlockState: stateSrvc.Block,
BasePath: cfg.Global.BasePath,
Roles: cfg.Core.Roles,
Port: cfg.Network.Port,
Bootnodes: cfg.Network.Bootnodes,
ProtocolID: cfg.Network.ProtocolID,
NoBootstrap: cfg.Network.NoBootstrap,
NoMDNS: cfg.Network.NoMDNS,
MinPeers: cfg.Network.MinPeers,
MaxPeers: cfg.Network.MaxPeers,
PublishMetrics: cfg.Global.PublishMetrics,
LogLvl: cfg.Log.NetworkLvl,
BlockState: stateSrvc.Block,
BasePath: cfg.Global.BasePath,
Roles: cfg.Core.Roles,
Port: cfg.Network.Port,
Bootnodes: cfg.Network.Bootnodes,
ProtocolID: cfg.Network.ProtocolID,
NoBootstrap: cfg.Network.NoBootstrap,
NoMDNS: cfg.Network.NoMDNS,
MinPeers: cfg.Network.MinPeers,
MaxPeers: cfg.Network.MaxPeers,
PublishMetrics: cfg.Global.PublishMetrics,
PersistentPeers: cfg.Network.PersistentPeers,
}

networkSrvc, err := network.NewService(&networkConfig)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 // indirect
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect
golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd
golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd // indirect
google.golang.org/appengine v1.6.5 // indirect
google.golang.org/protobuf v1.25.0
)
Expand Down

0 comments on commit 7850532

Please sign in to comment.