Skip to content

Commit

Permalink
only remove entries from peerstore after a grace period of one minute
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Nov 26, 2021
1 parent 03203a8 commit 3ea7493
Showing 1 changed file with 40 additions and 9 deletions.
49 changes: 40 additions & 9 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ type BasicHost struct {
// keep track of resources we need to wait on before shutting down
refCount sync.WaitGroup

network network.Network
networkCtxCancel context.CancelFunc // the context is canceled once the network has shut down
network network.Network

mux *msmux.MultistreamMuxer
ids identify.IDService
hps *holepunch.Service
Expand Down Expand Up @@ -353,8 +355,10 @@ func (h *BasicHost) updateLocalIpAddr() {
// Start starts watchForAddrChanges tasks in the host
func (h *BasicHost) Start() {
h.refCount.Add(2)
ctx, cancel := context.WithCancel(context.Background())
h.networkCtxCancel = cancel
go h.watchForAddrChanges()
go h.gcPeerstore()
go h.gcPeerstore(ctx)
}

// newStreamHandler is the remote-opened stream handler for network.Network
Expand Down Expand Up @@ -522,23 +526,47 @@ func (h *BasicHost) watchForAddrChanges() {
}
}

func (h *BasicHost) gcPeerstore() {
// gcPeerstore removes disconnected peers from the peer store one minute after they have disconnected.
// The ctx controls the shutdown of this function.
// Don't use the host's context here. The context here must not be closed before the network has shut down.
func (h *BasicHost) gcPeerstore(ctx context.Context) {
defer h.refCount.Done()
sub, err := h.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
if err != nil {
log.Warnw("failed to listen for peer connectedness changed events", "error", err)
return
}
defer sub.Close()

m := make(map[peer.ID]time.Time)
ticker := time.NewTicker(20 * time.Second)
const gracePeriod = time.Minute // the time we keep old entries around
for {
// Note that this might shut down before the swarm has closed all connections.
select {
case <-h.ctx.Done():
case <-ctx.Done():
for p := range m {
h.Peerstore().RemovePeer(p)
}
return
case e := <-sub.Out():
ev := e.(event.EvtPeerConnectednessChanged)
if ev.Connectedness == network.NotConnected {
h.Peerstore().RemovePeer(ev.Peer)
p := ev.Peer
switch ev.Connectedness {
case network.NotConnected:
if _, ok := m[p]; !ok {
m[p] = time.Now()
}
case network.Connected:
// If we reconnect to the peer before we've cleared the information,
// keep it.
delete(m, p)
}
case now := <-ticker.C:
for p, disconnectTime := range m {
if now.Add(gracePeriod).Before(disconnectTime) {
h.Peerstore().RemovePeer(p)
delete(m, p)
}
}
}
}
Expand Down Expand Up @@ -1059,12 +1087,15 @@ func (h *BasicHost) Close() error {
_ = h.emitters.evtLocalAddrsUpdated.Close()
h.Network().Close()
_ = h.emitters.evtPeerConnectednessChanged.Close()
if h.networkCtxCancel != nil {
h.networkCtxCancel()
}

h.refCount.Wait()

if h.Peerstore() != nil {
h.Peerstore().Close()
}

h.refCount.Wait()
})

return nil
Expand Down

0 comments on commit 3ea7493

Please sign in to comment.