-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
remove peers from the peer store when they disconnect #1231
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ go 1.16 | |
require ( | ||
github.com/gogo/protobuf v1.3.2 | ||
github.com/libp2p/go-libp2p v0.14.4 | ||
github.com/libp2p/go-libp2p-core v0.11.0 | ||
github.com/libp2p/go-libp2p-core v0.11.1-0.20211024101752-b18a4c9c5629 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here too. |
||
github.com/libp2p/go-libp2p-discovery v0.6.0 | ||
github.com/libp2p/go-libp2p-kad-dht v0.15.0 | ||
github.com/libp2p/go-libp2p-mplex v0.4.1 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ go 1.16 | |
require ( | ||
github.com/gdamore/tcell/v2 v2.1.0 | ||
github.com/libp2p/go-libp2p v0.14.1 | ||
github.com/libp2p/go-libp2p-core v0.11.0 | ||
github.com/libp2p/go-libp2p-core v0.11.1-0.20211024101752-b18a4c9c5629 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. amd here, lest we foeget. |
||
github.com/libp2p/go-libp2p-pubsub v0.6.0 | ||
github.com/rivo/tview v0.0.0-20210125085121-dbc1f32bb1d0 | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,13 +23,13 @@ require ( | |
github.com/libp2p/go-libp2p-autonat v0.6.0 | ||
github.com/libp2p/go-libp2p-blankhost v0.2.0 | ||
github.com/libp2p/go-libp2p-circuit v0.4.0 | ||
github.com/libp2p/go-libp2p-core v0.11.0 | ||
github.com/libp2p/go-libp2p-core v0.11.1-0.20211024101752-b18a4c9c5629 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here too. |
||
github.com/libp2p/go-libp2p-discovery v0.6.0 | ||
github.com/libp2p/go-libp2p-mplex v0.4.1 | ||
github.com/libp2p/go-libp2p-nat v0.1.0 | ||
github.com/libp2p/go-libp2p-netutil v0.1.0 | ||
github.com/libp2p/go-libp2p-noise v0.3.0 | ||
github.com/libp2p/go-libp2p-peerstore v0.4.0 | ||
github.com/libp2p/go-libp2p-peerstore v0.4.1-0.20211126123923-2767c2ed5a96 | ||
github.com/libp2p/go-libp2p-quic-transport v0.15.0 | ||
github.com/libp2p/go-libp2p-swarm v0.8.0 | ||
github.com/libp2p/go-libp2p-testing v0.5.0 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,8 +89,9 @@ type BasicHost struct { | |
negtimeout time.Duration | ||
|
||
emitters struct { | ||
evtLocalProtocolsUpdated event.Emitter | ||
evtLocalAddrsUpdated event.Emitter | ||
evtLocalProtocolsUpdated event.Emitter | ||
evtLocalAddrsUpdated event.Emitter | ||
evtPeerConnectednessChanged event.Emitter | ||
} | ||
|
||
addrChangeChan chan struct{} | ||
|
@@ -183,11 +184,10 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) { | |
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil { | ||
return nil, err | ||
} | ||
evtPeerConnectednessChanged, err := h.eventbus.Emitter(&event.EvtPeerConnectednessChanged{}) | ||
if err != nil { | ||
if h.emitters.evtPeerConnectednessChanged, err = h.eventbus.Emitter(&event.EvtPeerConnectednessChanged{}); err != nil { | ||
return nil, err | ||
} | ||
h.Network().Notify(newPeerConnectWatcher(evtPeerConnectednessChanged)) | ||
h.Network().Notify(newPeerConnectWatcher(h.emitters.evtPeerConnectednessChanged)) | ||
|
||
if !h.disableSignedPeerRecord { | ||
cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore()) | ||
|
@@ -350,10 +350,11 @@ func (h *BasicHost) updateLocalIpAddr() { | |
} | ||
} | ||
|
||
// Start starts background tasks in the host | ||
// Start starts watchForAddrChanges tasks in the host | ||
func (h *BasicHost) Start() { | ||
h.refCount.Add(1) | ||
go h.background() | ||
h.refCount.Add(2) | ||
go h.watchForAddrChanges() | ||
go h.gcPeerstore() | ||
} | ||
|
||
// newStreamHandler is the remote-opened stream handler for network.Network | ||
|
@@ -459,7 +460,7 @@ func (h *BasicHost) makeSignedPeerRecord(evt *event.EvtLocalAddressesUpdated) (* | |
return record.Seal(rec, h.signKey) | ||
} | ||
|
||
func (h *BasicHost) background() { | ||
func (h *BasicHost) watchForAddrChanges() { | ||
defer h.refCount.Done() | ||
var lastAddrs []ma.Multiaddr | ||
|
||
|
@@ -521,6 +522,28 @@ func (h *BasicHost) background() { | |
} | ||
} | ||
|
||
func (h *BasicHost) gcPeerstore() { | ||
defer h.refCount.Done() | ||
sub, err := h.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) | ||
if err != nil { | ||
log.Warnw("failed to listen for peer connectedness changed events", "error", err) | ||
return | ||
} | ||
defer sub.Close() | ||
for { | ||
// Note that this might shut down before the swarm has closed all connections. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this might be problematic with the ds peerstore, we might leave garbage behind. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that's why I put the comment there. I'm a bit wary of building yet more shutdown logic (we'd need to have a |
||
select { | ||
case <-h.ctx.Done(): | ||
return | ||
case e := <-sub.Out(): | ||
ev := e.(event.EvtPeerConnectednessChanged) | ||
if ev.Connectedness == network.NotConnected { | ||
h.Peerstore().RemovePeer(ev.Peer) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will race with a reconnect and could leave us without peer info. Unfortunately, to do this right, we'll need to add and remove peer info from a single goroutine. But that could have some interesting perf impact? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, we go to great length to avoid mutex contention, for example here: https://github.com/libp2p/go-libp2p-peerstore/blob/986d5ceedb842c664be0bbac66f598006392e2ff/pstoremem/protobook.go#L11-L20. An alternative approach (which @aschmahmann suggested above: #1231 (comment)) would be to somehow connect the peerstore with the host and have a |
||
} | ||
Comment on lines
+539
to
+542
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the reason we're doing this instead of letting the peerstore's have their own GC methods since they can just listen on the eventbus? It's also not clear from here that this doesn't effect the AddrBook and that other peerstore implementations should ignore the AddrBook for RemovePeer. If we did remove peers from the AddrBook on disconect it would be the source of a lot of problems for earlier code written where this assumption wasn't true (e.g. dialing DHT servers we recently connected to, but are not currently). What do we get out of having the other subcomponents of the peerstore clear out immediately instead of on a timer, or at their own leisure? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The peerstore is currently a pretty independent object. It doesn't know anything about libp2p hosts, event busses, etc. (note that the constructor doesn't take any arguments: https://github.com/libp2p/go-libp2p-peerstore/blob/986d5ceedb842c664be0bbac66f598006392e2ff/pstoremem/peerstore.go#L20-L29). We could pass in an event bus there, but that would lead to a tighter coupling.
The
Currently, memory use is growing unboundedly. Clearing it immediately on disconnect seems the easiest way to prevent a DoS. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I see what you're saying however this makes the The |
||
} | ||
} | ||
} | ||
|
||
// ID returns the (local) peer.ID associated with this Host | ||
func (h *BasicHost) ID() peer.ID { | ||
return h.Network().LocalPeer() | ||
|
@@ -627,7 +650,7 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I | |
}, nil | ||
} | ||
|
||
// Negotiate the protocol in the background, obeying the context. | ||
// Negotiate the protocol in the watchForAddrChanges, obeying the context. | ||
var selected string | ||
errCh := make(chan error, 1) | ||
go func() { | ||
|
@@ -1035,6 +1058,7 @@ func (h *BasicHost) Close() error { | |
_ = h.emitters.evtLocalProtocolsUpdated.Close() | ||
_ = h.emitters.evtLocalAddrsUpdated.Close() | ||
h.Network().Close() | ||
_ = h.emitters.evtPeerConnectednessChanged.Close() | ||
|
||
if h.Peerstore() != nil { | ||
h.Peerstore().Close() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will need a stable reference here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I'm waiting with cutting the release, just in case something else comes up that we need to get into v0.16.0. Will release and then rebase all the PRs.